2 回答
TA贡献1864条经验 获得超2个赞
为什么不将消息发送给一群工作人员呢?
像这样的东西:
...
const workerPoolSize = 10 // the number of workers you want to have
wg := &sync.WaitGroup{}
wCh := make(chan string)
wg.Add(workerPoolSize) // you want to wait for 10 workers to finish the job
// run workers in goroutines
for i := 0; i < workerPoolSize; i++ {
go func(wch <-chan string) {
// get the data from the channel
for text := range wch {
c.Publish("logs", 0, false, text)
token.Wait()
}
wg.Done() // worker says that he finishes the job
}(wCh)
}
for i := 0; i < 5; i++ {
// put the data to the channel
wCh <- fmt.Sprintf("this is msg #%d!", i)
}
close(wCh)
wg.Wait() // wait for all workers to finish
...
TA贡献1813条经验 获得超2个赞
当您说“在执行高延迟任务时大量传递消息”时,我假设您的意思是您想要异步发送消息(因此消息由与主代码运行不同的 go 例程处理)。
如果是这种情况,那么对您的初始示例进行非常简单的更改将为您提供:
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("logs", 0, false, text)
// comment out... token.Wait()
}
注意:您的示例代码可能会在消息实际发送之前退出;添加 time.Sleep(10 * time.Second) 会给它时间让它们熄灭;请参阅下面的代码了解处理此问题的另一种方法
您的初始代码在消息发送之前停止的唯一原因是您调用了 token.Wait()。如果您不关心错误(并且您不检查错误,所以我假设您不关心),那么调用 token.Wait() 就没有什么意义(它只是等待消息发送;消息将消失无论你是否调用 token.Wait() )。
如果您想记录任何错误,您可以使用类似以下内容:
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
token := c.Publish("logs", 0, false, text)
go func(){
token.Wait()
err := token.Error()
if err != nil {
fmt.Printf("Error: %s\n", err.Error()) // or whatever you want to do with your error
}
}()
}
请注意,如果消息传递至关重要(但由于您没有检查错误,我假设它不是),您还需要做一些其他事情。
就您找到的代码而言;我怀疑这会增加您不需要的复杂性(并且需要更多信息才能解决此问题;例如,MqttProtocol 结构未在您粘贴的位中定义)。
额外的一点......在您的评论中您提到“发布的消息必须排序”。如果这是必要的(因此您想等到每条消息都已送达后再发送另一条消息),那么您需要类似以下内容:
msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
var wg sync.WaitGroup
wg.Add(1)
go func(){ // go routine to send messages from channel
for msg := range msgChan {
token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
token.Wait()
// should check for errors here
}
wg.Done()
}()
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
msgChan <- text
}
close(msgChan) // this will stop the goroutine (when all messages processed)
wg.Wait() // Wait for all messages to be sent before exiting (may wait for ever is mqtt broker down!)
注意:这与 Ilya Kaznacheev 的解决方案类似(如果将workerPoolSize设置为1并使通道缓冲)
正如您的评论表明等待组使这一点难以理解,这里是另一种可能更清晰的等待方式(等待组通常在您等待多件事情完成时使用;在这个例子中,我们只等待一件事情,所以可以使用更简单的方法)
msgChan := make(chan string, 200) // Allow a queue of up to 200 messages
done := make(chan struct{}) // channel used to indicate when go routine has finnished
go func(){ // go routine to send messages from channel
for msg := range msgChan {
token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital
token.Wait()
// should check for errors here
}
close(done) // let main routine know we have finnished
}()
for i := 0; i < 5; i++ {
text := fmt.Sprintf("this is msg #%d!", i)
msgChan <- text
}
close(msgChan) // this will stop the goroutine (when all messages processed)
<-done // wait for publish go routine to complete
- 2 回答
- 0 关注
- 121 浏览
添加回答
举报