2 回答
TA贡献1810条经验 获得超5个赞
编辑:
正如评论中所建议的,需要另外三个步骤:
该
Transformer
状态存储中必须明确地保存其状态。它将从 中获取对状态存储的引用ProcessorContext
,并在init
方法中传递该引用。国家商店必须在
StreamsBuilder
必须在
transform
方法中传递状态存储的名称。
在这个例子中,存储我们看到的最后一条消息就足够了。我们KeyValueStore
为此使用 a ,它在每个时间点都有零个或一个条目。
public class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {
private String storeName;
public PairTransformerSupplier(String storeName) {
this.storeName = storeName;
}
@Override
public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {
return new PairTransformer<>(storeName);
}
}
public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {
private ProcessorContext context;
private String storeName;
private KeyValueStore<Integer, V> stateStore;
public PairTransformer(String storeName) {
this.storeName = storeName;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
stateStore = (KeyValueStore<Integer, V>) context.getStateStore(storeName);
}
@Override
public KeyValue<K, Pair<V, V>> transform(K key, V value) {
// 1. Update the store to remember the last message seen.
if (stateStore.get(1) == null) {
stateStore.put(1, value); return null;
}
KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(stateStore.get(1), value));
stateStore.put(1, null);
return result;
}
@Override
public void close() { }
}
public KStream<String, String> sampleStream(StreamsBuilder builder) {
KStream<String, String> messages = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// 2. Create the state store and register it with the streams builder.
KeyValueBytesStoreSupplier store = Stores.persistentKeyValueStore(stateStoreName);
StoreBuilder storeBuilder = new KeyValueStoreBuilder<>(
store,
new Serdes.IntegerSerde(),
new Serdes.StringSerde(),
Time.SYSTEM
);
builder.addStateStore(storeBuilder);
transformToPairs(messages);
return messages;
}
private void transformToPairs(KStream<String, String> messages) {
// 3. reference the name of the state store when calling transform(...)
KStream<String, Pair<String, String>> pairs = messages.transform(
new PairTransformerSupplier<>(),
stateStoreName
);
KStream<String, Pair<String, String>> filtered = pairs.filter((key, value) -> value != null);
KStream<String, String> serialized = filtered.mapValues(Pair::toString);
serialized.to(outputTopic);
}
可以使用控制台消费者观察状态存储的变化:
./bin/kafka-console-consumer --topic <changelog-topic-name> --bootstrap-server localhost:9092
完整源代码在这里:https : //github.com/1123/spring-kafka-stream-with-state-store
原答案:
org.apache.kafka.streams.kstream.ValueMapper接口的 JavaDoc声明它用于无状态的逐条记录转换,而org.apache.kafka.streams.kstream.Transformer另一方面,接口是
用于将输入记录有状态映射到零、一个或多个新输出记录。
因此我猜这个Transformer接口是收集消息对的合适选择。这可能仅在流应用程序出现故障和重新启动的情况下才有意义,以便它们可以从 Kafka 恢复状态。
因此,这是基于org.apache.kafka.streams.kstream.Transformer接口的另一种解决方案:
class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {
@Override
public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {
return new PairTransformer<>();
}
}
public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {
private V left;
@Override
public void init(ProcessorContext context) {
left = null;
}
@Override
public KeyValue<K, Pair<V, V>> transform(K key, V value) {
if (left == null) { left = value; return null; }
KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(left, value));
left = null;
return result;
}
@Override
public KeyValue<K, Pair<V, V>> punctuate(long timestamp) {
return null;
}
public void close() { }
}
然后按如下方式使用 PairTransformerSupplier:
private void accumulateTwo(KStream<String, String> messages) {
messages.transform(new PairTransformerSupplier<>())
.filter((key, value) -> value != null)
.mapValues(Pair::toString)
.to("pairs");
}
但是,在具有单个分区的主题的单个进程中尝试这两种解决方案会产生完全相同的结果。我还没有尝试过具有多个分区和多个流消费者的主题。
TA贡献1820条经验 获得超9个赞
您应该能够编写一个累加器类
class Accumulator implements ValueMapper<String, Optional<Tuple<String>>> {
private String key;
public Optional<Tuple<String>> get(String item) {
if (key == null) {
key = item;
return Optional.empty();
}
Optional<Tuple<String>> result = Optional.of(new Tuple<>(key, item));
key = null;
return result;
}
}
然后处理
messages.mapValues(new Accumulator())
.filter(Optional::isPresent) // I don't think your filter is correct
.to("pairs");
添加回答
举报