2 回答
TA贡献1789条经验 获得超10个赞
我不使用lagom所以这可能只是一个想法。但作为akka-streams(lagom至少我认为)的一部分 - 从这个解决方案中得到你需要的东西应该很容易。
我使用了 akka-stream-kafka,效果非常好(我只做了一个原型)
当你消费消息时,你会做一些事情:
Consumer
.committableSource(
consumerSettings(..), // config of Kafka
Subscriptions.topics("kafkaWsPathMsgTopic")) // Topic to subscribe
.mapAsync(10) { msg =>
business(msg.record) // do something
}
TA贡献1817条经验 获得超6个赞
Alan Klikic 在此处的 Lightbend 讨论论坛上提供了答案。
第1部分:
如果您只在业务服务中使用外部 Kafka 集群,那么您可以仅使用 Lagom Broker API 来实现它。所以你需要:
使用仅具有主题定义的服务描述符创建 API(此 API 未实现)
在您的业务服务中根据您的部署配置 kafka_native(正如我在上一篇文章中提到的)
在您的业务服务中从 #1 中创建的 API 注入服务并使用 Lagom Broker API 订阅者订阅它
偏移提交,在 Lagom Broker API 订阅者是开箱即用的。
第2部分:
Kafka 和 AMQP 消费者实现需要持久的 akka 流。所以你需要处理断开连接。这些可以通过两种方式完成:
通过将其包装在一个actor中来控制持久的akka流。您在actor preStart 上初始化流流,并将流完成传输到将停止它的actor。如果流完成或失败,演员将停止。然后使用重启策略将actor包装在actor回退中,这将在完成或失败的情况下重新启动actor并重新初始化Flow
akka 流延迟重启与退避阶段
Personnaly 我使用 #1 并没有尝试 #2。
可以在您的 Lagom 组件 trait 中为 #1 或 Flow 为 #2 初始化退避演员(基本上在您现在使用 Lagom Broker API 进行订阅的地方)。
配置consumer时一定要设置consumer group,避免重复消费。您可以像 Lagom 一样使用描述符中的服务名称作为消费者组名称。
添加回答
举报