2 回答
TA贡献1810条经验 获得超5个赞
在 JetStream 中有一种方法可以实现与时间相关的检索。
now := time.Now()
oneAndHalfSecondAgo := now.Add(time.Millisecond * -1500)
js, _ := nc.JetStream()
sub, err := js.SubscribeSync(
"foo.*",
nats.OrderedConsumer(),
nats.StartTime(oneAndHalfSecondAgo),
)
for {
msg, err := sub.NextMsg(10 * time.Second) //oldest->newer ones
if err != nil {
log.Fatal(err)
}
// 1. check timestamp of message and if its after ‘now’ then we break out of the for loop here
// 2. if the message is before now we can push it in an array here
}
请注意,这种技术虽然有用,但效率非常低,因为我们一个接一个地抓取消息。
我们可以使用 .Subscribe() (这是异步的)来修改它,但是我们会遇到一个不同的问题:
我们将在当前时刻从 JetStream 过度拉动,然后我们必须确保我们抓取的缓冲消息确实会返回给 JetStream。据我所知,没有配置选项可以告诉 JetStream 有关“MaxTime”的信息。
至于如何“获取最新的 N 消息”,可以修改上面的代码示例,以便在获得所有消息后,他将获得相当高的消息块(pe 所有消息在最后 5 秒或 10 秒或 30 秒内)到现在为止,他可以抓取最新的“N”条消息。
这种技术当然并不理想,但似乎没有其他方法可以做到这一点——至少在撰写本文时还没有。
- 2 回答
- 0 关注
- 102 浏览
添加回答
举报