我尝试为 NATS 限制队列编写订阅者:sub, err := js.SubscribeSync(fullSubject, nats.Context(ctx))if err != nil { return err}msg, err := sub.NextMsgWithContext(ctx)if err != nil { if errors.Is(err, nats.ErrSlowConsumer) { log.Printf("Slow consumer error returned. Waiting for reset...") time.Sleep(50 * time.Millisecond) continue } else { return err }}msg.InProgress()var message pnats.NatsMessageif err := conn.unmarshaller(msg.Data, &message); err != nil { msg.Term() return err}actualSubject := message.Context.FullSubject()handler, ok := callbacks[message.Context.Category]if !ok { msg.Nak() continue}callback, err := handler(&message)if err == nil { msg.Ack() msg.Term()} else { msg.Nak() return err}callback(ctx)此代码的目标是使用有关多个主题的任何消息并调用与该主题关联的回调函数。handler这段代码有效,但我遇到的问题是,如果该函数未返回错误,我希望在调用后删除该消息。我以为那是msg.Term在做什么,但我仍然看到队列中的所有消息。我最初是围绕一个工作队列设计的,但我希望它能与多个订阅者一起工作,所以我不得不重新设计它。有什么办法可以使这项工作?
1 回答
梵蒂冈之花
TA贡献1900条经验 获得超5个赞
根据提供的代码,我假设您在使用 JetStream 库创建订阅时没有提供流和消费者信息。
在该方法的文档SubscribeSync
中,它说当未提供流和消费者信息时,库将创建一个临时消费者,消费者的名称由服务器选择。它还试图弄清楚订阅是针对哪个流。
这是我相信在您的代码中发生的事情:
当您调用该
SubscribeSync
方法时,将使用您提供的主题创建一个临时消费者。当
msg.Ack
和msg.Term
被调用时,您确实确认了该消息,但仅限于当前消费者。下次调用该
SubscribeSync
方法时,将创建一个新的临时消费者,其中包含您已在另一个消费者上删除的消息。Jetstream 的流、消费者和订阅概念就是这样设计的。
根据您想要完成的任务,这里有一些建议:
使用普通的 NATS 核心库来处理发布/订阅或队列。不要使用 JetStream。NATS Core 库直接处理主题,而 Jetstream 库在未提供信息的情况下创建额外的东西(流和消费者)。
使用 JetStream,但通过代码或直接在 NATS 服务器上自己创建流和持久消费者。这样,在定义了流和消费者的情况下,您应该能够使其按预期工作。
- 1 回答
- 0 关注
- 139 浏览
添加回答
举报
0/150
提交
取消