我正在尝试将 Jaeger 跟踪集成到 K-Streams 中。我计划向我的几个最重要的管道添加跟踪,并且想知道将 traceid 从一个管道传递到另一个管道的好方法是什么?这是我到目前为止所拥有的 - 在流处理管道开始时,我启动了一个服务器跨度并将 traceid 保存到状态存储中。稍后,在转换管道中,我访问 statestore 并从 transform() 方法捕获跟踪。这是在流处理中处理跟踪的好方法吗?input .mapValues(toSomethingThatMyAppUnderstands) .mapValues(this::startStreamTrace) .filter((k, v) -> v.isPresent()) .mapValues(Optional::get) .mapValues(doSomethingHereWith) .flatMapValues(doSomethingElse) .filter((k, v) -> isInterestingEvent(v)) .transform(() -> new TransformerWithTracing<SomeObjectA, SomeObjectB>(IN_MEM_STORE_NAME, someFunction), IN_MEM_STORE_NAME) .flatMapValues(c -> c) .to(outTopic, Produced.with(Serdes.String(), new EventSerde()));public class TransformerWithTracing<V, VR> implements Transformer<String, V, KeyValue<String, VR>> { final Function valueAction; final String storeId; private KeyValueStore<String, String> traceIdStore; public TransformerWithTracing(String storeId, Function valueAction) { this.storeId = storeId; this.valueAction = valueAction; } @Override public void init(ProcessorContext context) { // KeyValueStore store = ((KeyValueStore<String, String>) context.getStateStore(storeId)); InMemoryKeyValueStore inMemoryKeyValueStore = (InMemoryKeyValueStore) store; this.traceIdStore = store; } @Override public KeyValue<String, VR> transform(String key, V value) { System.out.println(traceIdStore.get(key)); // BuildTraceHeader try(Scope scope = serviceTracer.startServerSpan(traceHeader, "Converting to Enterprise Event")) { return KeyValue.pair(key, (VR) valueAction.apply(value)); } } @Override public KeyValue<String, VR> punctuate(long timestamp) { return null; } @Override public void close() {// if (streamId != null) traceIdStore.delete(streamId); }}
1 回答
MMTTMM
TA贡献1869条经验 获得超4个赞
@jeqo 在这个 zipkin/brave repo 中也有类似的想法。
https://github.com/jeqo/brave/tree/kafka-streams-processor/instrumentation/kafka-streams
opentracing-contrib repo 中似乎也有一些可用的东西,但它似乎只在跟踪生产者/消费者级别。
https://github.com/opentracing-contrib/java-kafka-client/tree/master/opentracing-kafka-streams
莱尼
添加回答
举报
0/150
提交
取消