1 回答

TA贡献1906条经验 获得超10个赞
事实证明,可以使用这样的branch
处理器来实现这一点:
input:
kafka:
addresses:
- localhost:9092
consumer_group: benthos_consumer_group
topics:
- benthos_input
pipeline:
processors:
# Decode the message
- schema_registry_decode:
url: http://localhost:8081
# Populate output content field
- bloblang: |
root.content = this
# Decode kafka_key metadata payload and populate output metadata field
- branch:
request_map: |
root = meta("kafka_key")
processors:
- schema_registry_decode:
url: http://localhost:8081
result_map: |
root.metadata = meta()
root.metadata.kafka_key = this
output:
stdout: {}
- 1 回答
- 0 关注
- 192 浏览
添加回答
举报