为了账号安全,请及时绑定邮箱和手机立即绑定

如何在同一主题上使用globalKtable和StateStore?

如何在同一主题上使用globalKtable和StateStore?

九州编程 2024-01-28 16:22:42
只是为了澄清,我是卡夫卡的新手,很抱歉,如果我的问题似乎没有记录,我正在阅读教程、文档和我能理解的一切。我试图从 GlobalStore 读取所有值以更新其值,然后使用已存在的 StateStore 来放置这些新的更新值。我正在尝试这样做,因为当我这样做时:this.stateStore.all();我只有1/10的数据,如果我理解正确的话,这是因为我有10个分区,而ss,只读取一个(虽然我不太明白为什么)这是我的全局表:    public StreamsBuilder declareTopology(StreamsBuilder builder) {        logger.debug("Building topology : input topic ~ {} ; output topics ~ {}, {}",                getInputTopic(),                getDataTopic(),                getToEsTopic());        builder.globalTable(                getDataTopic(),                Consumed.with(Serdes.String(), fooSerdes)                        .withOffsetResetPolicy(Topology.AutoOffsetReset.EARLIEST),                Materialized.<String, Foo, KeyValueStore<Bytes, byte[]>>as(                        "foosktable")                        .withKeySerde(Serdes.String())                        .withValueSerde(fooSerdes)                        .withLoggingEnabled(new HashMap<>()));    ...这是 addStateStore,我无法删除它,因为它在代码的其他地方使用:       ...       builder.addStateStore(            Stores.keyValueStoreBuilder(                    Stores.persistentKeyValueStore("foosktable"),                    Serdes.String(),                    fooSerdes));    ...    return builder;}因此,从理论上讲,我想做的是删除也使用相同主题的 StateStore,并使用我的 data.process 主题之一放置我的数据,问题是该处理器使用此 StateStore 执行其他操作,所以我不能用核武器攻击它。我在这里迷路了,任何光都会有很大帮助。谢谢 !
查看完整描述

1 回答

?
胡说叔叔

TA贡献1804条经验 获得超8个赞

有点不清楚你真正想要实现的目标是什么。然而,一些高级解释:

AGlobalKTable只有一个目的:从主题读取数据而不进行修改,以允许执行连接KStream-GlobalKTable或通过“交互式查询”查询存储。

因此,您无法真正执行您想要的操作,因为无法按照您的意图将数据从全局存储复制到另一个存储。您需要复制输入主题并读取两次:(1) 和GlobalKTable(2) 常规,KStream以在将数据放入商店之前修改数据。对于 (2),您可以使用transform().

希望这可以帮助。


查看完整回答
反对 回复 2024-01-28
  • 1 回答
  • 0 关注
  • 75 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信