为了账号安全,请及时绑定邮箱和手机立即绑定

Lagom 服务使用来自 Kafka 的输入

Lagom 服务使用来自 Kafka 的输入

哔哔one 2022-06-04 11:04:39
我试图弄清楚如何使用 Lagom 来使用来自通过 Kafka 通信的外部系统的数据。我遇到了Lagom 文档的这一部分,它描述了 Lagom 服务如何通过订阅其主题与另一个 Lagom 服务进行通信。helloService  .greetingsTopic()  .subscribe // <-- you get back a Subscriber instance  .atLeastOnce(  Flow.fromFunction(doSomethingWithTheMessage))但是,当您想要订阅包含由某个随机外部系统产生的事件的 Kafka 主题时,合适的配置是什么?此功能是否需要某种适配器?为了澄清,我现在有这个:object Aggregator {  val TOPIC_NAME = "my-aggregation"}trait Aggregator extends Service {  def aggregate(correlationId: String): ServiceCall[Data, Done]  def aggregationTopic(): Topic[DataRecorded]  override final def descriptor: Descriptor = {    import Service._    named("aggregator")      .withCalls(        pathCall("/api/aggregate/:correlationId", aggregate _)      )      .withTopics(        topic(Aggregator.TOPIC_NAME, aggregationTopic())          .addProperty(            KafkaProperties.partitionKeyStrategy,            PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)          )      )      .withAutoAcl(true)  }}我可以通过简单的 POST 请求调用它。但是,我希望通过使用Data来自某些(外部)Kafka 主题的消息来调用它。我想知道是否有一种方法可以以类似于此模型的方式配置描述符:override final def descriptor: Descriptor = {  ...  kafkaTopic("my-input-topic")    .subscribe(serviceCall(aggregate _)    .withAtMostOnceDelivery}我在 Google Groups 上遇到过这个讨论,但是在 OPs 问题中,我没有看到他实际上对EventMessages做了任何事情,some-topic除了将它们路由到他的服务定义的主题。
查看完整描述

2 回答

?
至尊宝的传说

TA贡献1789条经验 获得超10个赞

我不使用lagom所以这可能只是一个想法。但作为akka-streams(lagom至少我认为)的一部分 - 从这个解决方案中得到你需要的东西应该很容易。


我使用了 akka-stream-kafka,效果非常好(我只做了一个原型)


当你消费消息时,你会做一些事情:


     Consumer

      .committableSource(

          consumerSettings(..), // config of Kafka

          Subscriptions.topics("kafkaWsPathMsgTopic")) // Topic to subscribe

      .mapAsync(10) { msg =>

        business(msg.record) // do something

      }


查看完整回答
反对 回复 2022-06-04
?
慕的地6264312

TA贡献1817条经验 获得超6个赞

Alan Klikic 在此处的 Lightbend 讨论论坛上提供了答案。

第1部分:

如果您只在业务服务中使用外部 Kafka 集群,那么您可以仅使用 Lagom Broker API 来实现它。所以你需要:

  1. 使用仅具有主题定义的服务描述符创建 API(此 API 未实现)

  2. 在您的业务服务中根据您的部署配置 kafka_native(正如我在上一篇文章中提到的)

  3. 在您的业务服务中从 #1 中创建的 API 注入服务并使用 Lagom Broker API 订阅者订阅它

偏移提交,在 Lagom Broker API 订阅者是开箱即用的。

第2部分:

Kafka 和 AMQP 消费者实现需要持久的 akka 流。所以你需要处理断开连接。这些可以通过两种方式完成:

  1. 通过将其包装在一个actor中来控制持久的akka流。您在actor preStart 上初始化流流,并将流完成传输到将停止它的actor。如果流完成或失败,演员将停止。然后使用重启策略将actor包装在actor回退中,这将在完成或失败的情况下重新启动actor并重新初始化Flow

  2. akka 流延迟重启与退避阶段

Personnaly 我使用 #1 并没有尝试 #2。

可以在您的 Lagom 组件 trait 中为 #1 或 Flow 为 #2 初始化退避演员(基本上在您现在使用 Lagom Broker API 进行订阅的地方)。

配置consumer时一定要设置consumer group,避免重复消费。您可以像 Lagom 一样使用描述符中的服务名称作为消费者组名称。


查看完整回答
反对 回复 2022-06-04
  • 2 回答
  • 0 关注
  • 69 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信