为了账号安全,请及时绑定邮箱和手机立即绑定

如何从头订阅

如何从头订阅

Go
ITMISS 2023-06-12 09:33:26
我正在尝试使用 GroupId 编写一个 Kafka Consumer foo,它订阅某个主题并从头开始读取(即使之前有偏移量)。我尝试与重新平衡回调一起使用Subscribe,但它似乎从未被调用(已设置设置go.application)。有什么例子可以使这项工作成功吗?
查看完整描述

2 回答

?
翻过高山走不出你

TA贡献1875条经验 获得超3个赞

你可能只需要将你的值设置 auto.offset.resetkafka.OffsetBeginning.String()

package main


/**

 * Copyright 2016 Confluent Inc.

 */


// consumer_example implements a consumer using the non-channel Poll() API

// to retrieve messages and events.


import (

    "fmt"

    "github.com/confluentinc/confluent-kafka-go/kafka"

    "os"

    "os/signal"

    "syscall"

)


func main() {


    broker := "YOUR_BROKER"

    group := "YOUR_GROUP"

    topics := "YOUR_TOPICS"

    sigchan := make(chan os.Signal, 1)

    signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)


    c, err := kafka.NewConsumer(&kafka.ConfigMap{

        "bootstrap.servers":  broker,

        "group.id":           group,

        "session.timeout.ms": 6000,

        "auto.offset.reset":  kafka.OffsetBeginning.String()})


    if err != nil {

        fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)

        os.Exit(1)

    }


    fmt.Printf("Created Consumer %v\n", c)


    err = c.SubscribeTopics(topics, nil)


    run := true


    for run == true {

        select {

        case sig := <-sigchan:

            fmt.Printf("Caught signal %v: terminating\n", sig)

            run = false

        default:

            ev := c.Poll(100)

            if ev == nil {

                continue

            }


            switch e := ev.(type) {

            case *kafka.Message:

                fmt.Printf("%% Message on %s:\n%s\n",

                    e.TopicPartition, string(e.Value))

                if e.Headers != nil {

                    fmt.Printf("%% Headers: %v\n", e.Headers)

                }

            case kafka.Error:

                // Errors should generally be considered as informational, the client will try to automatically recover

                fmt.Fprintf(os.Stderr, "%% Error: %v\n", e)

            default:

                fmt.Printf("Ignored %v\n", e)

            }

        }

    }


    fmt.Printf("Closing consumer\n")

    c.Close()

}


查看完整回答
反对 回复 2023-06-12
?
慕码人2483693

TA贡献1860条经验 获得超9个赞

我们现在设置enable.auto.commitfalse. 这样,就不会存储偏移量,我们每次运行都从头开始消费。



查看完整回答
反对 回复 2023-06-12
  • 2 回答
  • 0 关注
  • 143 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信