为了账号安全,请及时绑定邮箱和手机立即绑定

使用自定义时间戳提取器的 Kafka 流窗口

使用自定义时间戳提取器的 Kafka 流窗口

精慕HU 2021-11-11 16:42:32
我正在尝试创建一个 Kafka Streams 应用程序,我试图在一个时间窗口内计算每个平台的唯一设备。事件类public class Event {    private String eventId;    private String deviceId;    private String platform;    private ZonedDateTime createdAt;}我需要时间窗口尊重事件的 createdAt 所以我写了一个TimestampExtractor如下的实现:public class EventTimestampExtractor implements TimestampExtractor {    @Override    public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) {        final Event event = (Event) record.value();        final ZonedDateTime eventCreationTime = event.getCreatedAt();        final long timestamp = eventCreationTime.toEpochSecond();        log.trace("Event ({}) yielded timestamp: {}", event.getEventId(), timestamp);        return timestamp;    }}最后,这是我的流媒体应用代码:final KStream<String, Event> eventStream = builder.stream("events_ingestion");eventStream    .selectKey((key, event) -> {        final String platform = event.getPlatform();        final String deviceId = event.getDeviceId());        return String.join("::", platform, deviceId);    })    .groupByKey()    .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(15)))    .count(Materialized.as(COUNT_STORE));当我将事件推送到event_ingestion主题时,我可以看到时间戳已记录到应用程序日志中,并且数据正在写入计数存储中。当我遍历计数存储时,我看到以下内容:Key: [ANDROID::1@1539000000/1539900000], Value: 2虽然我的时间窗口是 15 分钟,但密钥跨越 10 天。如果我从流配置中删除我的 TimestampExtractor 实现(因此回到处理时间),密钥按预期跨越 15 分钟:Key: [ANDROID::1@1539256500000/1539257400000], Value: 1我在这里做错了什么?有任何想法吗?
查看完整描述

1 回答

?
慕妹3242003

TA贡献1824条经验 获得超6个赞

TimestampExtractor 使用纪元毫秒值进行窗口化。您正在计算“秒”,这会将消息放入错误的时间窗口。


查看完整回答
反对 回复 2021-11-11
  • 1 回答
  • 0 关注
  • 243 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信