为了账号安全,请及时绑定邮箱和手机立即绑定

如何获取时间戳指定的kafka偏移数据

如何获取时间戳指定的kafka偏移数据

慕桂英4014372 2023-02-16 16:40:42
当我尝试运行时抛出空指针错误时,我试图根据时间戳获取 Kafka 主题的偏移量,Map<TopicPartition, Long> timestampsToSearch = new HashMap<>();              for (TopicPartition partition : partitions) {                timestampsToSearch.put(partition,  startTimestamp);              }Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch);              for (TopicPartition partition : partitions) {                Long seekOffset = outOffsets.get(partition).offset();consumer.seek(partition, seekOffset);任何帮助将不胜感激。
查看完整描述

2 回答

?
杨__羊羊

TA贡献1943条经验 获得超7个赞

要找到与时间戳对应的偏移量,您需要使用 方法offsetsForTimes()

例如,这将打印mytopic对应于 1 秒前的分区 0 的偏移量:

try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) {
    Map<TopicPartition, Long> timestamps = new HashMap<>();
    timestamps.put(new TopicPartition("mytopic", 0), System.currentTimeMillis()-1*1000);
    Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps);
    System.err.println(offsets);
}

这将显示如下内容:

{offset-test-0=(timestamp=1561469319192, leaderEpoch=0, offset=100131)}


查看完整回答
反对 回复 2023-02-16
?
白猪掌柜的

TA贡献1893条经验 获得超10个赞

您可以Admin.listOffsets使用OffsetSpec.forTimestamp

Map<TopicPartition, OffsetSpec> topicOffsetSpecs = new HashMap<>();

TopicPartition topicPartition = new TopicPartition("topic1", 0);

OffsetSpec offsetSpec = OffsetSpec.forTimestamp(timestamp);

topicOffsetSpecs.put(topicPartition, offsetSpec);

admin.listOffsets(topicOffsetSpecs).all().get(); // Info for given timestamp


查看完整回答
反对 回复 2023-02-16
  • 2 回答
  • 0 关注
  • 444 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信