为了账号安全,请及时绑定邮箱和手机立即绑定

Kafka-Streaming:如何收集消息对并写入新主题

Kafka-Streaming:如何收集消息对并写入新主题

哆啦的时光机 2021-10-13 10:17:19
这是kafka-streaming的初学者问题。您将如何使用 java kafka-streaming 库收集消息对并将它们写入新的输出主题?我在想这样的事情:private void accumulateTwo(KStream<String, String> messages) {    Optional<String> accumulator = Optional.empty();    messages.mapValues(value -> {        if (accumulator.isPresent()) {            String tmp = accumulator.get();            accumulator = Optional.empty();            return Optional.of(new Tuple<>(tmp, value));        }        else {            accumulator = Optional.of(value);            return Optional.empty();        }    }).filter((key, value) -> value.isPresent()).to("pairs");然而这行不通,因为 Java Lambda 表达式中的变量必须是 final 的。有任何想法吗?
查看完整描述

2 回答

?
森栏

TA贡献1810条经验 获得超5个赞

编辑:

正如评论中所建议的,需要另外三个步骤:

  1. Transformer状态存储中必须明确地保存其状态。它将从 中获取对状态存储的引用ProcessorContext,并在init方法中传递该引用。

  2. 国家商店必须在 StreamsBuilder

  3. 必须在transform方法中传递状态存储的名称。

在这个例子中,存储我们看到的最后一条消息就足够了。我们KeyValueStore为此使用 a ,它在每个时间点都有零个或一个条目。

public class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {


    private String storeName;


    public PairTransformerSupplier(String storeName) {

        this.storeName = storeName;

    }


    @Override

    public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {

        return new PairTransformer<>(storeName);

    }

}



public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {

    private ProcessorContext context;

    private String storeName;

    private KeyValueStore<Integer, V> stateStore;


    public PairTransformer(String storeName) {

        this.storeName = storeName;

    }


    @Override

    public void init(ProcessorContext context) {

        this.context = context;

        stateStore = (KeyValueStore<Integer, V>) context.getStateStore(storeName);

    }


    @Override

    public KeyValue<K, Pair<V, V>> transform(K key, V value) {

        // 1. Update the store to remember the last message seen. 

        if (stateStore.get(1) == null) {

            stateStore.put(1, value); return null;

        }

        KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(stateStore.get(1), value));

        stateStore.put(1, null);

        return result;

    }


    @Override

    public void close() { }


}



public KStream<String, String> sampleStream(StreamsBuilder builder) {

    KStream<String, String> messages = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

    // 2. Create the state store and register it with the streams builder. 

    KeyValueBytesStoreSupplier store = Stores.persistentKeyValueStore(stateStoreName);

    StoreBuilder storeBuilder = new KeyValueStoreBuilder<>(

            store,

            new Serdes.IntegerSerde(),

            new Serdes.StringSerde(),

            Time.SYSTEM

    );

    builder.addStateStore(storeBuilder);

    transformToPairs(messages);

    return messages;

}


private void transformToPairs(KStream<String, String> messages) {

    // 3. reference the name of the state store when calling transform(...)

    KStream<String, Pair<String, String>> pairs = messages.transform(

            new PairTransformerSupplier<>(),

            stateStoreName

    );

    KStream<String, Pair<String, String>> filtered = pairs.filter((key, value) -> value != null);

    KStream<String, String> serialized = filtered.mapValues(Pair::toString);

    serialized.to(outputTopic);

}

可以使用控制台消费者观察状态存储的变化:


./bin/kafka-console-consumer --topic <changelog-topic-name> --bootstrap-server localhost:9092

完整源代码在这里:https : //github.com/1123/spring-kafka-stream-with-state-store


原答案:

org.apache.kafka.streams.kstream.ValueMapper接口的 JavaDoc声明它用于无状态的逐条记录转换,而org.apache.kafka.streams.kstream.Transformer另一方面,接口是


用于将输入记录有状态映射到零、一个或多个新输出记录。


因此我猜这个Transformer接口是收集消息对的合适选择。这可能仅在流应用程序出现故障和重新启动的情况下才有意义,以便它们可以从 Kafka 恢复状态。


因此,这是基于org.apache.kafka.streams.kstream.Transformer接口的另一种解决方案:


class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {


    @Override

    public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {

        return new PairTransformer<>();

    }

}


public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {

    private V left;


    @Override

    public void init(ProcessorContext context) {

        left = null;

    }


    @Override

    public KeyValue<K, Pair<V, V>> transform(K key, V value) {

        if (left == null) { left = value; return null; }

        KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(left, value));

        left = null;

        return result;

    }


    @Override

    public KeyValue<K, Pair<V, V>> punctuate(long timestamp) {

        return null;

    }


    public void close() { }


}

然后按如下方式使用 PairTransformerSupplier:


private void accumulateTwo(KStream<String, String> messages) {

    messages.transform(new PairTransformerSupplier<>())

            .filter((key, value) -> value != null)

            .mapValues(Pair::toString)

            .to("pairs");

}

但是,在具有单个分区的主题的单个进程中尝试这两种解决方案会产生完全相同的结果。我还没有尝试过具有多个分区和多个流消费者的主题。


查看完整回答
反对 回复 2021-10-13
?
慕妹3146593

TA贡献1820条经验 获得超9个赞

您应该能够编写一个累加器类


class Accumulator implements ValueMapper<String, Optional<Tuple<String>>> {

    private String key;


    public Optional<Tuple<String>> get(String item) {

        if (key == null) {

            key = item;

            return Optional.empty();

        }

        Optional<Tuple<String>> result = Optional.of(new Tuple<>(key, item));

        key = null;

        return result;

    }

}

然后处理


messages.mapValues(new Accumulator())

        .filter(Optional::isPresent) // I don't think your filter is correct

        .to("pairs");


查看完整回答
反对 回复 2021-10-13
  • 2 回答
  • 0 关注
  • 101 浏览

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信