为了账号安全,请及时绑定邮箱和手机立即绑定

使用此代码(Paho MQTT)作为 GoRoutine 并通过通道传递消息以通过

使用此代码(Paho MQTT)作为 GoRoutine 并通过通道传递消息以通过

Go
侃侃无极 2023-07-26 17:20:34
作为标准代码,我用来发布消息以进行测试:func main() {    opts := MQTT.NewClientOptions().AddBroker("tcp://127.0.0.1:1883")    opts.SetClientID("myclientid_")    opts.SetDefaultPublishHandler(f)    opts.SetConnectionLostHandler(connLostHandler)    opts.OnConnect = func(c MQTT.Client) {        fmt.Printf("Client connected, subscribing to: test/topic\n")        if token := c.Subscribe("logs", 0, nil); token.Wait() && token.Error() != nil {            fmt.Println(token.Error())            os.Exit(1)        }    }    c := MQTT.NewClient(opts)    if token := c.Connect(); token.Wait() && token.Error() != nil {        panic(token.Error())    }    for i := 0; i < 5; i++ {        text := fmt.Sprintf("this is msg #%d!", i)        token := c.Publish("logs", 0, false, text)        token.Wait()    }    time.Sleep(3 * time.Second)    if token := c.Unsubscribe("logs"); token.Wait() && token.Error() != nil {        fmt.Println(token.Error())        os.Exit(1)    }    c.Disconnect(250)}这个效果很好!但是在执行高延迟任务时大量传递消息,我的程序性能会很低,所以我必须使用 goroutine 和 Channel。这段代码正是我想要的!但作为 Golang 中的菜鸟,我不知道如何START()在主函数中运行函数以及要传递什么参数!特别是,我将如何使用通道将消息传递给工作人员(发布者)?!我们将不胜感激您的帮助!
查看完整描述

2 回答

?
慕斯王

TA贡献1864条经验 获得超2个赞

为什么不将消息发送给一群工作人员呢?


像这样的东西:


...

    const workerPoolSize = 10 // the number of workers you want to have

    wg := &sync.WaitGroup{}

    wCh := make(chan string)

    wg.Add(workerPoolSize) // you want to wait for 10 workers to finish the job


    // run workers in goroutines

    for i := 0; i < workerPoolSize; i++ {

        go func(wch <-chan string) {

            // get the data from the channel

            for text := range wch {

                c.Publish("logs", 0, false, text)

                token.Wait()

            }

            wg.Done() // worker says that he finishes the job

        }(wCh)

    }


    for i := 0; i < 5; i++ {

        // put the data to the channel

        wCh <- fmt.Sprintf("this is msg #%d!", i)

    }


        close(wCh)

    wg.Wait() // wait for all workers to finish

...


查看完整回答
反对 回复 2023-07-26
?
慕姐8265434

TA贡献1813条经验 获得超2个赞

当您说“在执行高延迟任务时大量传递消息”时,我假设您的意思是您想要异步发送消息(因此消息由与主代码运行不同的 go 例程处理)。

如果是这种情况,那么对您的初始示例进行非常简单的更改将为您提供:

for i := 0; i < 5; i++ {

        text := fmt.Sprintf("this is msg #%d!", i)

        token := c.Publish("logs", 0, false, text)

        // comment out... token.Wait()

    }

注意:您的示例代码可能会在消息实际发送之前退出;添加 time.Sleep(10 * time.Second) 会给它时间让它们熄灭;请参阅下面的代码了解处理此问题的另一种方法


您的初始代码在消息发送之前停止的唯一原因是您调用了 token.Wait()。如果您不关心错误(并且您不检查错误,所以我假设您不关心),那么调用 token.Wait() 就没有什么意义(它只是等待消息发送;消息将消失无论你是否调用 token.Wait() )。


如果您想记录任何错误,您可以使用类似以下内容:


for i := 0; i < 5; i++ {

        text := fmt.Sprintf("this is msg #%d!", i)

        token := c.Publish("logs", 0, false, text)

        go func(){

            token.Wait()

            err := token.Error()

            if err != nil {

                fmt.Printf("Error: %s\n", err.Error()) // or whatever you want to do with your error

            }

        }()

    }

请注意,如果消息传递至关重要(但由于您没有检查错误,我假设它不是),您还需要做一些其他事情。


就您找到的代码而言;我怀疑这会增加您不需要的复杂性(并且需要更多信息才能解决此问题;例如,MqttProtocol 结构未在您粘贴的位中定义)。


额外的一点......在您的评论中您提到“发布的消息必须排序”。如果这是必要的(因此您想等到每条消息都已送达后再发送另一条消息),那么您需要类似以下内容:


msgChan := make(chan string, 200) // Allow a queue of up to 200 messages

var wg sync.WaitGroup

wg.Add(1)

go func(){ // go routine to send messages from channel

    for msg := range msgChan {

        token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital

        token.Wait()

        // should check for errors here

    }

    wg.Done()

}()


for i := 0; i < 5; i++ {

        text := fmt.Sprintf("this is msg #%d!", i)

        msgChan <- text

    }

close(msgChan) // this will stop the goroutine (when all messages processed)

wg.Wait() // Wait for all messages to be sent before exiting (may wait for ever is mqtt broker down!)

注意:这与 Ilya Kaznacheev 的解决方案类似(如果将workerPoolSize设置为1并使通道缓冲)


正如您的评论表明等待组使这一点难以理解,这里是另一种可能更清晰的等待方式(等待组通常在您等待多件事情完成时使用;在这个例子中,我们只等待一件事情,所以可以使用更简单的方法)


msgChan := make(chan string, 200) // Allow a queue of up to 200 messages

done := make(chan struct{}) // channel used to indicate when go routine has finnished


go func(){ // go routine to send messages from channel

    for msg := range msgChan {

        token := c.Publish("logs", 2, false, msg) // Use QOS2 is order is vital

        token.Wait()

        // should check for errors here

    }

    close(done) // let main routine know we have finnished

}()


for i := 0; i < 5; i++ {

        text := fmt.Sprintf("this is msg #%d!", i)

        msgChan <- text

    }

close(msgChan) // this will stop the goroutine (when all messages processed)

<-done // wait for publish go routine to complete


查看完整回答
反对 回复 2023-07-26
  • 2 回答
  • 0 关注
  • 130 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信