1 回答
TA贡献1799条经验 获得超9个赞
您是否看到序列化事件的文档部分?
https://github.com/cloudevents/sdk-go#serializedeserialize-a-cloudevent
event := cloudevents.NewEvent()
event.SetSource("example/uri")
event.SetType("example.type")
// data here is a map[string] interface{}, or some other Struct type representing the "example.type" schema type above
event.SetData(cloudevents.ApplicationJSON, data)
bytes, err := json.Marshal(event)
if err != nil {
log.Fatal(err)
}
producerMsg := &sarama.ProducerMessage{
Topic: s.outputTopic,
Value: bytes, // you've already encoded the event
}
否则,请务必查看提供的使用 CloudEvent 客户端的示例代码 https://github.com/cloudevents/sdk-go/blob/main/samples/kafka/sender/main.go
sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, "test-topic")
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
defer sender.Close(context.Background())
c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
event := cloudevents.NewEvent()
event.Set...
c.Send(..., event)
...
- 1 回答
- 0 关注
- 50 浏览
添加回答
举报