我有一个用例,我想在不确认消息的情况下阅读 Pubsub 中的消息。当我不确认传递的消息时,我需要有关如何排除“重复消息”的可能性的帮助,这些消息将保留在 Pubsub 存储中。我想到的解决方案:将拉取的消息存储在 Datastore 中,看看它们是否相同。在运行时存储拉取的消息并检查我的消息是否重复 O(n) 时间复杂度和空间复杂度 O(n)。将拉取的消息存储在文件中,并比较来自文件中消息的新传入消息。使用数据流并排除可能性(最不期望)我认为 Pubsub 中没有类似于 Kafka 的偏移功能。您在这件事上建议的最佳方法/或我可以使用的任何其他替代方法是什么?我正在使用 python google-cloud-pubsub_v1 创建一个 python 客户端并从 Pubsub 中提取消息。我正在共享代码,这是提取数据的逻辑subscription_path = subscriber.subscription_path( project_id, subscription_name) NUM_MESSAGES = 3 # The subscriber pulls a specific number of messages. response = subscriber.pull(subscription_path, max_messages=NUM_MESSAGES) for received_message in response.received_messages: print(received_message.message.data)
1 回答

月关宝盒
TA贡献1772条经验 获得超5个赞
听起来 Pub/Sub 可能不是适合这项工作的工具。似乎您正在尝试将 Pub/Sub 用作持久数据存储,这不是预期的用例。Acking 是 Cloud Pub/Sub 消息生命周期的基本组成部分。如果 Pub/Sub 消息在提供的消息保留期(不能超过 7 天)后未被确认,则会被删除。
我建议您考虑使用像Cloud Spanner这样的 SQL 数据库。然后,您可以为每条消息生成一个 uuid,将其用作重复数据删除的主键,并以事务方式更新数据库以确保没有重复。
如果您提供有关您计划如何处理重复数据删除消息的更多信息,我可能会为您提供更好的答案。
添加回答
举报
0/150
提交
取消