3 回答
TA贡献1818条经验 获得超11个赞
您是否尝试过使用regular expressions.
例子 :
consume, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": "server",
})
err = consume.SubscribeTopics([]string{"^.*_mypattern"}, nil)
来源:https ://github.com/confluentinc/confluent-kafka-go/issues/96
在初始化 consumer 时也尝试设置此选项metadata.max.age.ms。这将刷新元数据以查看是否有任何新主题可用。
TA贡献1827条经验 获得超4个赞
该逻辑的代码片段会有所帮助。
您可以使用Mongo Change Streams来做到这一点。
例如,要查看集合的更改,请使用以下Collection.Watch()方法 -
var collection *mongo.Collection
// specify a pipeline that will only match "insert" events
// specify the MaxAwaitTimeOption to have each attempt wait two seconds for new documents
matchStage := bson.D{{"$match", bson.D{{"operationType", "insert"}}}}
opts := options.ChangeStream().SetMaxAwaitTime(2 * time.Second)
changeStream, err := collection.Watch(context.TODO(), mongo.Pipeline{matchStage}, opts)
if err != nil {
log.Fatal(err)
}
// print out all change stream events in the order they're received
// see the mongo.ChangeStream documentation for more examples of using change streams
for changeStream.Next(context.TODO()) {
fmt.Println(changeStream.Current)
// NewConsumer
}
然后创建一个新的消费者或者.SubscribeTopics()在你更新你的集合并且它符合你的标准时调用
TA贡献1851条经验 获得超5个赞
如果需要,消费者可以使用来自动态主题的消息。我认为您可能使用 Redis PubSub 而不是 Kafka。
因为当您需要从最近创建的主题中消费时,消费者必须重新连接到代理,并且在频繁添加新主题时成本很高。
我假设新主题描述了一个聊天室/组。如果正确,Redis PubSub Subscription 比 Kafka Consumer 轻。您可以将频道用作聊天室/群组。
或者您可以同时使用 Kafka 和 Redis PubSub,在从 Kafka 消费创建的房间/组事件后,将其设置为 Redis PubSub 的频道,您就可以开始订阅了。
- 3 回答
- 0 关注
- 193 浏览
添加回答
举报