为了账号安全,请及时绑定邮箱和手机立即绑定

有没有办法从 Kafka 主题获取最后一条消息?

有没有办法从 Kafka 主题获取最后一条消息?

潇湘沐 2023-06-28 15:28:26
我有一个具有多个分区的 Kafka 主题,我想知道 Java 中是否有一种方法可以获取该主题的最后一条消息。我不关心分区,我只想获取最新消息。我已经尝试过@KafkaListener,但它仅在主题更新时才获取消息。如果应用程序打开后没有发布任何内容,则不会返回任何内容。也许倾听者根本就不是解决问题的正确方法?
查看完整描述

2 回答

?
牛魔王的故事

TA贡献1830条经验 获得超3个赞

以下片段对我有用。你可以试试这个。评论里有解释。


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singletonList(topic));


        consumer.poll(Duration.ofSeconds(10));


        consumer.assignment().forEach(System.out::println);


        AtomicLong maxTimestamp = new AtomicLong();

        AtomicReference<ConsumerRecord<String, String>> latestRecord = new AtomicReference<>();


        // get the last offsets for each partition

        consumer.endOffsets(consumer.assignment()).forEach((topicPartition, offset) -> {

            System.out.println("offset: "+offset);


            // seek to the last offset of each partition

            consumer.seek(topicPartition, (offset==0) ? offset:offset - 1);


            // poll to get the last record in each partition

            consumer.poll(Duration.ofSeconds(10)).forEach(record -> {


                // the latest record in the 'topic' is the one with the highest timestamp

                if (record.timestamp() > maxTimestamp.get()) {

                    maxTimestamp.set(record.timestamp());

                    latestRecord.set(record);

                }

            });

        });

        System.out.println(latestRecord.get());


查看完整回答
反对 回复 2023-06-28
?
森林海

TA贡献2011条经验 获得超2个赞

您必须使用每个分区中的最新消息,然后在客户端进行比较(使用消息上的时间戳,如果包含它)。原因是 Kafka 不保证分区间的顺序。在分区内,您可以确定偏移量最大的消息是最新推送到该分区的消息。



查看完整回答
反对 回复 2023-06-28
  • 2 回答
  • 0 关注
  • 260 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信