为了账号安全,请及时绑定邮箱和手机立即绑定

从 Kafka ktable 中正确删除记录

从 Kafka ktable 中正确删除记录

守着一只汪 2022-05-21 17:20:40
我想从 KTable 中正确删除对象。我有以下流处理数据:input.map(this::deserialize).filter(((key, event) ->    Arrays.asList(        "AssessmentInitialized",        "AssessmentRenamed",        "AssessmentDeleted"    ).contains(event.eventType()))).map( ... do mapping  ... ).groupBy((key, event) -> key, Serialized.with(Serdes.String(), domainEventSerde)).aggregate(    Assessment::new,    (key, event, assessment) -> assessment.handleEvent(event),    Materialized.<String, Assessment, KeyValueStore<Bytes, byte[]>>as(ASSESSMENTS_STORE)        .withKeySerde(Serdes.String())        .withValueSerde(assessmentSerde));并在处理事件时assessment.handleEvent(event)返回。Serde 是mapper 默认为 com.fasterxml.jackson.databind.ObjectMapper bean 的地方。nullAssessmentDeletedthis.assessmentSerde = new JsonSerde<>(Assessment.class, objectMapper);在此流处理我的事件后,我在 Kafka KTable 更改日志中看到以下值:{"topic": "events-consumer-v1-assessments-store-changelog","key": "5d6d7a70-c907-460f-ac88-69705f079939","value": "ée","partition": 0,"offset": 2}它看起来不像我想要的东西。这种删除对象的方式正确吗?我预计如果我将 null 作为值推送到聚合操作,它将被删除,但看起来像留下了一些垃圾,我不确定它的映射器问题或不正确的删除方式或正确的 KTable 行为。
查看完整描述

1 回答

?
慕莱坞森

TA贡献1810条经验 获得超4个赞

在您看来,问题出在检查工具中。由于某种原因,它null不正确地反序列化值。首先使用 Kafka 工具进行检查总是好的(kafka-console-consumer.shkafka-console-producer.sh)。



查看完整回答
反对 回复 2022-05-21
  • 1 回答
  • 0 关注
  • 156 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信