为了账号安全,请及时绑定邮箱和手机立即绑定

ksqldb——拉式查询的一次交付,多个应用程序实例

ksqldb——拉式查询的一次交付,多个应用程序实例

Go
慕容森 2022-11-08 15:58:26
我正在尝试在 ksqldb 之上构建一个应用程序。假设我将有一个简单的生产者:package mainimport (    "fmt"    "github.com/rmoff/ksqldb-go"    "net/http")var client = ksqldb.NewClient("http://localhost:8088", "", "").Debug()func init() {    offset := `SET 'auto.offset.reset' = 'earliest';`    if err := client.Execute(offset); err != nil {        panic(err)    }    s1 := `        CREATE OR REPLACE STREAM userEvents (            userId VARCHAR KEY,            eventType VARCHAR        )        WITH (            kafka_topic='user_events',             value_format='json',             partitions=8        );    `    if err := client.Execute(s1); err != nil {        panic(err)    }}func main() {    http.HandleFunc("/emit", hello)    http.ListenAndServe(":4201", nil)}func hello(w http.ResponseWriter, req *http.Request) {    userId := req.URL.Query().Get("userId")    if userId == "" {        http.Error(w, "no userId", 400)        return    }    userEvent := req.URL.Query().Get("event")    if userEvent == "" {        userEvent = "unknown"    }    err := client.Execute(fmt.Sprintf("INSERT INTO userEvents (userId, eventType) VALUES ('%s', '%s');",        userId, userEvent))    if err != nil {        http.Error(w, err.Error(), 500)        return    }    w.WriteHeader(200)    return}此应用程序创建一个数据流并公开一个端点以使用数据填充流。
查看完整描述

1 回答

?
侃侃尔雅

TA贡献1801条经验 获得超16个赞

首先,请注意我不再维护该客户端,您可能想查看https://github.com/thmeitz/ksqldb-go


现在回答你的问题。如果我理解正确,您希望出于并行目的运行同一逻辑使用者的多个实例,因此每条消息都应由该逻辑使用者处理一次。

如果是这种情况,那么您正在描述 Kafka 中所谓的消费者组。消费者的多个实例使用相同的客户端 ID 标识自己,Kafka 确保来自源主题分区的数据被路由到该组中的可用消费者。如果有四个消费者和八个分区,每个消费者将从两个分区获取数据。如果一个消费者离开了该组(它崩溃,您缩小规模等),那么 Kafka 会将该消费者的分区重新分配给该组的剩余消费者。

这与您所看到的行为不同,您在其中有效地实例化了多个独立的消费者。按照设计,Kafka 确保订阅某个主题的每个消费者都能接收到该主题的所有消息。

我这里特意说的是Kafka,而不是ksqlDB。这是因为 ksqlDB 是建立在 Kafka 之上的,为了理解您所看到的内容,解释基础知识很重要。

要获得您正在寻找的行为,您可能希望直接在您的消费者应用程序中使用消费者 API。您可以在此 Golang 和 Kafka 快速入门中查看消费者 API 的示例。要创建一个消费者组,您需要指定一个唯一的group.id.


查看完整回答
反对 回复 2022-11-08
  • 1 回答
  • 0 关注
  • 150 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信