当我尝试运行时抛出空指针错误时,我试图根据时间戳获取 Kafka 主题的偏移量,Map<TopicPartition, Long> timestampsToSearch = new HashMap<>(); for (TopicPartition partition : partitions) { timestampsToSearch.put(partition, startTimestamp); }Map<TopicPartition, OffsetAndTimestamp> outOffsets = consumer.offsetsForTimes(timestampsToSearch); for (TopicPartition partition : partitions) { Long seekOffset = outOffsets.get(partition).offset();consumer.seek(partition, seekOffset);任何帮助将不胜感激。
2 回答
杨__羊羊
TA贡献1943条经验 获得超7个赞
要找到与时间戳对应的偏移量,您需要使用 方法offsetsForTimes()
。
例如,这将打印mytopic
对应于 1 秒前的分区 0 的偏移量:
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);) { Map<TopicPartition, Long> timestamps = new HashMap<>(); timestamps.put(new TopicPartition("mytopic", 0), System.currentTimeMillis()-1*1000); Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestamps); System.err.println(offsets); }
这将显示如下内容:
{offset-test-0=(timestamp=1561469319192, leaderEpoch=0, offset=100131)}
白猪掌柜的
TA贡献1893条经验 获得超10个赞
您可以Admin.listOffsets
使用OffsetSpec.forTimestamp
:
Map<TopicPartition, OffsetSpec> topicOffsetSpecs = new HashMap<>();
TopicPartition topicPartition = new TopicPartition("topic1", 0);
OffsetSpec offsetSpec = OffsetSpec.forTimestamp(timestamp);
topicOffsetSpecs.put(topicPartition, offsetSpec);
admin.listOffsets(topicOffsetSpecs).all().get(); // Info for given timestamp
添加回答
举报
0/150
提交
取消