为了账号安全,请及时绑定邮箱和手机立即绑定

让 go 例程等待 rabbitMQ 发送结果

让 go 例程等待 rabbitMQ 发送结果

Go
噜噜哒 2023-05-08 15:33:15
我是 Go 的新手,我想制作一个管道来翻译我收到的每个请求,方法是将它发送到第一个队列 (TEST),然后从最后一个队列 (RESULT) 中获取最终结果并将其作为响应发回。我面临的问题是,响应永远不会等到所有结果从队列中返回。这是代码:func main() {    requests := []int{3, 4, 5, 6, 7}    var wg sync.WaitGroup    wg.Add(1)    resArr := []string{}    go func() {        for _, r := range requests {            rabbitSend("TEST", r)            resArr = append(resArr, <-rabbitReceive("RESULT"))        }        defer wg.Done()    }()    wg.Wait()    log.Println("Result", resArr)}兔子发送方法:func rabbitSend(queueName string, msg int) {    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")    failOnError(err, "Failed to connect to RabbitMQ")    defer conn.Close()    ch, err := conn.Channel()    failOnError(err, "Failed to open a channel")    defer ch.Close()    q, err := ch.QueueDeclare(        queueName, // name        true,      // durable        false,     // delete when unused        false,     // exclusive        false,     // no-wait        nil,       // arguments    )    failOnError(err, "Failed to declare a queue")    body, _ := json.Marshal(msg)    err = ch.Publish(        "",     // exchange        q.Name, // routing key        false,  // mandatory        false,  // immediate        amqp.Publishing{            ContentType: "application/json",            Body:        []byte(body),        })    log.Printf("[x] Sent %s to %s", body, q.Name)    failOnError(err, "Failed to publish a message")}兔子接收方法:func rabbitReceive(queueName string) <-chan string {    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")    failOnError(err, "Failed to connect to RabbitMQ")    defer conn.Close()    ch, err := conn.Channel()    failOnError(err, "Failed to open a channel")    defer ch.Close()    q, err := ch.QueueDeclare(        queueName, // name        true,      // durable        false,     // delete when usused        false,     // exclusive        false,     // no-waits        nil,       // arguments    )
查看完整描述

2 回答

?
侃侃尔雅

TA贡献1801条经验 获得超15个赞

修改你的func rabbitReceive(queueName string) <-chan string如下:


 func rabbitReceive(queueName string) <-chan string {

    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")

    failOnError(err, "Failed to connect to RabbitMQ")


    ch, err := conn.Channel()

    failOnError(err, "Failed to open a channel")


    q, err := ch.QueueDeclare(

        queueName, // name

        true,      // durable

        false,     // delete when usused

        false,     // exclusive

        false,     // no-waits

        nil,       // arguments

    )

    failOnError(err, "Failed to declare a queue")


    msgs, err := ch.Consume(

        q.Name, // queue

        "",     // consumer

        true,   // auto-ack

        false,  // exclusive

        false,  // no-local

        false,  // no-wait

        nil,    // args

    )

    failOnError(err, "Failed to register a consumer")


    resCh := make(chan string)

    go func() {

        d := <-msgs

        log.Printf("Received a message: %v from %v", string(d.Body), q.Name)

        resCh <- string(d.Body)

        conn.Close()

        ch.Close()

        close(resCh)

    }()

    return resCh

}

以前的代码导致您出现问题的原因是defer ch.Close(). ch在将响应写入之前关闭resCh。


查看完整回答
反对 回复 2023-05-08
?
慕哥6287543

TA贡献1831条经验 获得超10个赞

跟进 @nightfury1204 很好的答案,你确实ch在写信给resCh. 只有一件事,在 go 例程中你想遍历所有消息,所以更好的方法是:


go func() {

        for d := range msgs {

          log.Printf("Received a message: %v from %v", string(d.Body), q.Name)

          resCh <- string(d.Body)  

        }

        conn.Close()

        ch.Close()

        close(resCh)

    }()


查看完整回答
反对 回复 2023-05-08
  • 2 回答
  • 0 关注
  • 113 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信