我正在尝试在 ksqldb 之上构建一个应用程序。假设我将有一个简单的生产者:package mainimport ( "fmt" "github.com/rmoff/ksqldb-go" "net/http")var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()func init() { offset := `SET 'auto.offset.reset' = 'earliest';` if err := client.Execute(offset); err != nil { panic(err) } s1 := ` CREATE OR REPLACE STREAM userEvents ( userId VARCHAR KEY, eventType VARCHAR ) WITH ( kafka_topic='user_events', value_format='json', partitions=8 ); ` if err := client.Execute(s1); err != nil { panic(err) }}func main() { http.HandleFunc("/emit", hello) http.ListenAndServe(":4201", nil)}func hello(w http.ResponseWriter, req *http.Request) { userId := req.URL.Query().Get("userId") if userId == "" { http.Error(w, "no userId", 400) return } userEvent := req.URL.Query().Get("event") if userEvent == "" { userEvent = "unknown" } err := client.Execute(fmt.Sprintf("INSERT INTO userEvents (userId, eventType) VALUES ('%s', '%s');", userId, userEvent)) if err != nil { http.Error(w, err.Error(), 500) return } w.WriteHeader(200) return}此应用程序创建一个数据流并公开一个端点以使用数据填充流。
1 回答
侃侃尔雅
TA贡献1801条经验 获得超16个赞
首先,请注意我不再维护该客户端,您可能想查看https://github.com/thmeitz/ksqldb-go。
现在回答你的问题。如果我理解正确,您希望出于并行目的运行同一逻辑使用者的多个实例,因此每条消息都应由该逻辑使用者处理一次。
如果是这种情况,那么您正在描述 Kafka 中所谓的消费者组。消费者的多个实例使用相同的客户端 ID 标识自己,Kafka 确保来自源主题分区的数据被路由到该组中的可用消费者。如果有四个消费者和八个分区,每个消费者将从两个分区获取数据。如果一个消费者离开了该组(它崩溃,您缩小规模等),那么 Kafka 会将该消费者的分区重新分配给该组的剩余消费者。
这与您所看到的行为不同,您在其中有效地实例化了多个独立的消费者。按照设计,Kafka 确保订阅某个主题的每个消费者都能接收到该主题的所有消息。
我这里特意说的是Kafka,而不是ksqlDB。这是因为 ksqlDB 是建立在 Kafka 之上的,为了理解您所看到的内容,解释基础知识很重要。
要获得您正在寻找的行为,您可能希望直接在您的消费者应用程序中使用消费者 API。您可以在此 Golang 和 Kafka 快速入门中查看消费者 API 的示例。要创建一个消费者组,您需要指定一个唯一的group.id
.
- 1 回答
- 0 关注
- 150 浏览
添加回答
举报
0/150
提交
取消