为了账号安全,请及时绑定邮箱和手机立即绑定

RabbitMQ 队列长度始终为 0

RabbitMQ 队列长度始终为 0

Go
万千封印 2022-10-17 15:43:51
我正在编写一个应用程序,我遇到了这个问题,一遍又一遍地查看代码,似乎没有任何问题,用下面的基本代码片段测试,问题是可重现的...... RabbitMQ 说队列总是空的不是。下面的 Golang 代码片段显示了生产者发送消息的频率高于消费者消费消息的频率。消费者始终处于活动状态,但睡眠时间更长,以使队列在其积压中具有消息。结果?消费者每次尝试都会获取消息,但是 API 总是说没有消息 -> 消息计数为 0。package mainimport (    "encoding/json"    "fmt"    "github.com/streadway/amqp"    "io/ioutil"    "net/http"    "testing"    "time")func main() {    username := "guest"    password := "guest"    scheme := "amqp"    rabbitMqHost := "localhost"    port := "5672"    connectionString := fmt.Sprintf("%s://%s:%s@%s:%s/", scheme, username, password, rabbitMqHost, port)    conn, err := amqp.Dial(connectionString)    if err != nil {        panic(err)    }    ch, err := conn.Channel()    if err != nil {        panic(err)    }    exchangeName := "my-exchange"    // Declare exchange    err = ch.ExchangeDeclare(        exchangeName, // name        "fanout",     // type        true,         // durable        true,         // auto-deleted        false,        // internal        false,        // no-wait        nil,          // arguments    )    if err != nil {        panic(err)    }    // Create first Queue    queueName := "my-queue"    q, err := ch.QueueDeclare(        queueName, // name        true,      // durable        true,      // delete when unsused        false,     // exclusive        false,     // no-wait        nil,       // arguments    )    if err != nil {        panic(err)    }    // Bind Exchange to Queue    err = ch.QueueBind(        q.Name,       // queue name        "",           // routing key        exchangeName, // exchange        false,        nil,    )    // Listen    eventQueue, err := ch.Consume(        q.Name, // queue        "",     // consumer        true,   // auto-ack        false,  // exclusive        false,  // no-local        false,  // no-wait        nil,    // args    )    if err != nil {        panic(err)    }}您可以使用以下 RabbitMQ 服务器进行测试:docker run --rm --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
查看完整描述

2 回答

?
四季花海

TA贡献1811条经验 获得超5个赞

该字段q2.Messages不可靠,它是未等待确认的消息的计数,即已确认的消息。

你的消费者被声明为autoAck = true——即noAck——,这意味着不需要确认,这意味着已经确认了零个消息。

当您注释掉消费者时,确认消息的数量可能取决于发布者缓冲区。

使用 AMQP 0.9.1 以编程方式在给定队列上获取精确数量的消息基本上是不可能的。您可以改用message_stats管理 API 中的字段:

http://localhost:15672/api/queues/vhost/queue_name


查看完整回答
反对 回复 2022-10-17
?
汪汪一只猫

TA贡献1898条经验 获得超8个赞

接受的解决方案将是黑绿色的。证明是下面的替换,只需将问题部分中的消费者和发布者代码替换为:


// Listen

    eventQueue, err := ch.Consume(

        q.Name, // queue

        "",     // consumer

        false,  // auto-ack <-- Difference

        false,  // exclusive

        false,  // no-local

        false,  // no-wait

        nil,    // args

    )


    if err != nil {

        panic(err)

    }


    go func() {


        for a := range eventQueue {

            err = ch.Ack(a.DeliveryTag, false) // <-- Difference

            if err != nil {

                panic(err)

            }

            fmt.Printf("Received Event %s\n", string(a.Body))

            time.Sleep(time.Second * 4)

        }

    }()


    go func() {

        count := 0

        for {

            err = ch.Publish(exchangeName, "", false, false, amqp.Publishing{

                ContentType: "application/json",

                Body:        []byte(fmt.Sprintf("Message %d", count)),

            })


            fmt.Printf("Sent Message %d\n", count)

            count++

            if err != nil {

                panic(err)

            }

            if count >= 20 { // <-- Difference

                break

            }

            time.Sleep(time.Second * 2)

        }

    }()

输出:


.... The increase in the queue length

Sent Message 13

Queue Len: 8.000000 - 0

Queue Len: 8.000000 - 0

Received Event Message 4

Sent Message 14

Queue Len: 8.000000 - 0

Queue Len: 9.000000 - 0

Sent Message 15

Queue Len: 9.000000 - 0

Queue Len: 9.000000 - 0

Received Event Message 5

Sent Message 16

Queue Len: 9.000000 - 0

Queue Len: 9.000000 - 0

Sent Message 17

Queue Len: 11.000000 - 0

Queue Len: 11.000000 - 0

Received Event Message 6

Sent Message 18

Queue Len: 11.000000 - 0

Queue Len: 11.000000 - 0

Sent Message 19

Queue Len: 11.000000 - 0

Queue Len: 12.000000 - 0

Received Event Message 7

Queue Len: 12.000000 - 0

Queue Len: 12.000000 - 0

Queue Len: 12.000000 - 0

Queue Len: 12.000000 - 0

Received Event Message 8

Queue Len: 12.000000 - 0

Queue Len: 12.000000 - 0

Queue Len: 12.000000 - 0

Queue Len: 12.000000 - 0

Received Event Message 9

Queue Len: 12.000000 - 0

Queue Len: 11.000000 - 0

Queue Len: 11.000000 - 0

Queue Len: 11.000000 - 0

Received Event Message 10

Queue Len: 11.000000 - 0

Queue Len: 11.000000 - 0

Queue Len: 10.000000 - 0

Queue Len: 10.000000 - 0

Received Event Message 11

Queue Len: 10.000000 - 0

Queue Len: 10.000000 - 0

Queue Len: 10.000000 - 0

Queue Len: 9.000000 - 0

Received Event Message 12

....

As publisher exits it decreases, the consumer catches up and message len decreases:

Received Event Message 16

Queue Len: 5.000000 - 0

Queue Len: 5.000000 - 0

Queue Len: 5.000000 - 0

Queue Len: 4.000000 - 0

Received Event Message 17

Queue Len: 4.000000 - 0

Queue Len: 4.000000 - 0

Queue Len: 4.000000 - 0

Queue Len: 4.000000 - 0

Received Event Message 18

Queue Len: 2.000000 - 0

Queue Len: 2.000000 - 0

Queue Len: 2.000000 - 0

Queue Len: 2.000000 - 0

Received Event Message 19

Queue Len: 2.000000 - 0

Queue Len: 1.000000 - 0




查看完整回答
反对 回复 2022-10-17
  • 2 回答
  • 0 关注
  • 188 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信