1 回答
TA贡献2051条经验 获得超10个赞
在深入研究您的解决方案及其问题之前,让我再次推荐此答案中介绍的另一种 Broker 方法:如何使用通道广播消息
现在进入您的解决方案。
每当你启动 goroutine 时,请始终考虑它将如何结束,并确保如果 goroutine 不应该在应用的生命周期内运行,请确保它确实如此。
// ISSUE1: These goroutines apparently do not exit properly...
go func(ch chan interface{}) {
ch <- msg
}(sub.Data)
此 goroutine 尝试在 上发送值。这可能是一个阻塞操作:如果 的缓冲区已满并且 上没有现成的接收器,它将阻塞。这是脱离了发射的goroutine的控制,也脱离了对包装的控制。在某些情况下,这可能很好,但这已经给软件包的用户带来了负担。尽量避免这些。尝试创建易于使用且难以滥用的 API。chchchpubsub
此外,仅仅为了在频道上发送价值而启动 goroutine 是一种资源浪费(goroutine 既便宜又轻便,但你不应该尽可能地向它们发送垃圾邮件)。
你这样做是因为你不想被阻止。为避免阻塞,您可以使用具有“合理”高缓冲器的缓冲通道。是的,这并不能解决阻塞问题,只能帮助“慢速”客户端从通道接收。
要“真正”避免在不启动 goroutine 的情况下阻塞,您可以使用非阻塞发送:
select {
case ch <- msg:
default:
// ch's buffer is full, we cannot deliver now
}
如果发送可以继续,它将发生。如果没有,则立即选择分支。你必须决定该怎么做。“丢失”消息是否可以接受?等到“放弃”可以接受一段时间吗?或者是否可以启动一个goroutine来执行此操作(但随后您将回到我们在这里尝试解决的问题)?或者,在客户端可以从通道接收之前,是否可以被阻止...chdefault
选择合理的高缓冲区,如果遇到它仍然变满的情况,在客户端可以前进并从消息接收之前,阻止可能是可以接受的。如果不能,则整个应用可能处于不可接受的状态,并且“挂起”或“崩溃”可能是可以接受的。
// ISSUE2: close the channel async with a delay to ensure
// nothing will be written to the channel anymore
// via a pending goroutine from Publish()
go func(ch chan interface{}) {
time.Sleep(500 * time.Millisecond)
close(ch)
}(s.Data)
关闭通道是向接收器发出的信号,表示通道上不会发送更多值。因此,关闭通道始终是发送者的工作(和责任)。启动 goroutine 以关闭通道,您将该工作和责任“移交给”另一个不会与发送方同步的“实体”(goroutine)。这可能很容易导致死机(在闭合通道上发送是运行时死机,有关其他公理,请参阅未初始化的通道如何工作?)。别这样。
是的,这是必要的,因为您启动了goroutines来发送。如果你不这样做,那么你可以“就地”关闭,而不启动goroutine,因为这样发送者和关闭者将是同一个实体:它本身,其发送和关闭操作受互斥锁保护。因此,解决第一个问题自然而然地解决了第二个问题。Pubsub
通常,如果一个通道有多个发送方,则必须协调关闭通道。必须有一个实体(通常不是任何发送方)等待所有发送方完成,实际上使用 一个 ,然后该单个实体可以安全地关闭通道。请参阅关闭长度未知的通道。sync.WaitGroup
- 1 回答
- 0 关注
- 59 浏览
添加回答
举报