为了账号安全,请及时绑定邮箱和手机立即绑定

使用Go阅读来自谷歌酒吧订阅的所有可用消息

使用Go阅读来自谷歌酒吧订阅的所有可用消息

Go
翻阅古今 2022-09-12 21:14:14
我正在尝试从谷歌发布订阅中的主题中获取所有可用消息。但是在前进中,我无法找到一个配置,一旦Pub-Sub中没有更多消息剩余,就可以取消接收回调。我认为一种方法是使用Google云监控API从Pub-Sub获取消息总数,该答案中描述了Google PubSub - 计算主题中的消息数,然后保留已读消息数的计数,如果计数等于该数字,则调用取消,但我不太确定这是否是正确的方法。var mu sync.Mutex    received := 0    sub := client.Subscription(subID)    cctx, cancel := context.WithCancel(ctx)    err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {            mu.Lock()            defer mu.Unlock()            fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))            msg.Ack()            received++            if received == TotalNumberOfMessages {                    cancel()            }    })    if err != nil {            return fmt.Errorf("Receive: %v", err)    }我也尝试过使用超时的上下文,即在取消之后,直到不满足此上下文截止日期为止。ctx, cancel := context.WithTimeout(ctx, 100*time.Second)defer cancel()err = subscription.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {}但话又说回来,这并不能确定所有消息都已得到处理。请建议一个可以确保该订阅的解决方案。当 Pub-Sub 中不再有剩余消息时,接收功能将停止。
查看完整描述

1 回答

?
慕桂英546537

TA贡献1848条经验 获得超10个赞

我已经在我以前的公司中实现了它(可悲的是,我不再有代码,它是在我以前的公司git中...)。然而,它的工作原理。


原则如下


msg := make(chan *pubsub.Message, 1)

sub := client.Subscription(subID)

cctx, cancel := context.WithCancel(ctx)

go sub.Receive(cctx, func(ctx context.Context, m *pubsub.Message) {

    msg <- m

    })

for {

  select {

    case res := <-msg:

      fmt.Fprintf(w, "Got message: %q\n", string(res.Data))

      res.Ack()

  

    case <-time.After(3 * time.Second):

        fmt.Println("timeout")

        cancel()

    }

}


查看完整回答
反对 回复 2022-09-12
  • 1 回答
  • 0 关注
  • 91 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信