为了账号安全,请及时绑定邮箱和手机立即绑定

动态添加 Kafka 主题以供使用,而无需重新启动我的 GoLang 应用程序

动态添加 Kafka 主题以供使用,而无需重新启动我的 GoLang 应用程序

Go
阿波罗的战车 2022-06-01 11:19:43
我有一个 Golang 应用程序,它基本上以 Kafka 消费者为起点。我在运行时从 MongoDB 获取要收听的主题列表。但是,每次我要添加一个新的主题来听,添加到Mongo之后,我必须重新启动整个Golang应用程序。消费者位于主文件本身中。我正在使用 Confluent 作为客户端。有没有办法在不重新启动应用程序的情况下添加更多主题来消费?
查看完整描述

3 回答

?
慕尼黑8549860

TA贡献1818条经验 获得超11个赞

您是否尝试过使用regular expressions.


例子 :


consume, err := kafka.NewConsumer(&kafka.ConfigMap{

                     "bootstrap.servers":  "server",

})


err = consume.SubscribeTopics([]string{"^.*_mypattern"}, nil)

来源:https ://github.com/confluentinc/confluent-kafka-go/issues/96


在初始化 consumer 时也尝试设置此选项metadata.max.age.ms。这将刷新元数据以查看是否有任何新主题可用。


查看完整回答
反对 回复 2022-06-01
?
GCT1015

TA贡献1827条经验 获得超4个赞

该逻辑的代码片段会有所帮助。


您可以使用Mongo Change Streams来做到这一点。

例如,要查看集合的更改,请使用以下Collection.Watch()方法 -


var collection *mongo.Collection


// specify a pipeline that will only match "insert" events

// specify the MaxAwaitTimeOption to have each attempt wait two seconds for new documents

matchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}

opts := options.ChangeStream().SetMaxAwaitTime(2 * time.Second)

changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{matchStage}, opts)

if err != nil {

    log.Fatal(err)

}


// print out all change stream events in the order they're received

// see the mongo.ChangeStream documentation for more examples of using change streams

for changeStream.Next(context.TODO()) {

    fmt.Println(changeStream.Current)

    // NewConsumer

}

然后创建一个新的消费者或者.SubscribeTopics()在你更新你的集合并且它符合你的标准时调用


查看完整回答
反对 回复 2022-06-01
?
江户川乱折腾

TA贡献1851条经验 获得超5个赞

如果需要,消费者可以使用来自动态主题的消息。我认为您可能使用 Redis PubSub 而不是 Kafka。

因为当您需要从最近创建的主题中消费时,消费者必须重新连接到代理,并且在频繁添加新主题时成本很高。

我假设新主题描述了一个聊天室/组。如果正确,Redis PubSub Subscription 比 Kafka Consumer 轻。您可以将频道用作聊天室/群组。

或者您可以同时使用 Kafka 和 Redis PubSub,在从 Kafka 消费创建的房间/组事件后,将其设置为 Redis PubSub 的频道,您就可以开始订阅了。



查看完整回答
反对 回复 2022-06-01
  • 3 回答
  • 0 关注
  • 193 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信