我是这个Apache Kafka主题的新手,我正在编写一些基本的生产者 - 消费者代码,并且我遇到了一些消费者代码的问题,在启动zookeeper和Kafka之后,我创建了一个主题名称“firsttopic”,并且我正在使用CLI命令作为生产者输入一些事件,并且作为消费者检索这些事件,我已经使用Kafka-go编写了一个go代码,我在下面附加了该代码以及我也面临的错误。对于卡夫卡,我使用的是“github.com/segmentio/kafka-go”。func Startkafka() { conf := kafka.ReaderConfig{ Brokers: []string{"localhost:9092"}, Topic: "firsttopic", GroupID: "g1", MaxBytes: 10, } reader := kafka.NewReader(conf) for { m, err := reader.ReadMessage((context.Background())) if err != nil { fmt.Println("Some error occured", err) continue } fmt.Println("Message is : ", string(m.Value)) }}func main() { go Startkafka() fmt.Println("Kafka has been started...")}错误:卡夫卡已启动...读取 tcp 127.0.0.1:34858->127.0.1.1:9092 时发生某些错误:i/o 超时
3 回答
GCT1015
TA贡献1827条经验 获得超4个赞
根据该错误,您应该从映射到本地主机的 /etc/hosts 中删除 127.0.1.1,因为在客户端连接到本地主机期间,它应该保持为 127.0.0.1
或者,改为连接到 127.0.0.1:9092
千巷猫影
TA贡献1829条经验 获得超7个赞
在 u main 函数中,在 .然后,它在另一个堆栈中启动并发运行,而主例程结束执行。StartKafka()go routine
删除它并运行该应用程序。它将阻止应用程序并从卡夫卡主题消费。go routine
如果您需要为应用程序构建适当的正常关闭过程,则必须使用通道或其他内容。但以下代码仅用于使用主题内容。
func main() {
fmt.Println("starting kafka...")
Startkafka()
}
撒科打诨
TA贡献1934条经验 获得超2个赞
最大字节数:10 非常低,您应该增加它,因为这可能会导致问题。代码的其余部分似乎是正确的。您应该首先尝试使用 Kafka 安装附带的 Kafka 使用者进行连接,无论它是否启动,这也会验证您正在使用的连接字符串。
- 3 回答
- 0 关注
- 73 浏览
添加回答
举报
0/150
提交
取消