以cloudevents格式发送数据到Kafka主题



现在我有这个代码,它工作得很好。(它发送一些json格式的数据到Kafka主题)

j, err := json.Marshal(data)
if err != nil {
log.Fatal(err)
}
msg := &sarama.ProducerMessage{
Topic: tName,
Value: sarama.StringEncoder(j),
}
_, _, err = producer.SendMessage(msg)

但是有些人希望这些数据是云事件格式的。→https://github.com/cloudevents/sdk-go那么我该怎么办呢,因为这个事件结构不能直接转换为字符串。

type Event struct {
Context     EventContext
DataEncoded []byte
// DataBase64 indicates if the event, when serialized, represents
// the data field using the base64 encoding.
// In v0.3, this field is superseded by DataContentEncoding
DataBase64  bool
FieldErrors map[string]error
}

所以这段代码甚至不能编译

j, err := json.Marshal(data)
if err != nil {
log.Fatal(err)
}
//...
event := cloudevents.NewEvent()
event.SetSource("example/uri") 
event.SetType("example.type")
event.SetData(cloudevents.ApplicationJSON, j)
producerMsg := &sarama.ProducerMessage{
Topic: s.outputTopic,
Value: sarama.StringEncoder(event),
}
_, _, err = s.producer.SendMessage(producerMsg)

我应该怎么做才能把这个事件发送给Kafka?尝试释放事件。数据编码为字符串之类的?顺便说一句。编程语言是golang.

您看到文档中序列化事件的部分了吗?

https://github.com/cloudevents/sdk-go serializedeserialize-a-cloudevent

event := cloudevents.NewEvent()
event.SetSource("example/uri") 
event.SetType("example.type") 
// data here is a map[string] interface{}, or some other Struct type representing the "example.type" schema type above 
event.SetData(cloudevents.ApplicationJSON, data)
bytes, err := json.Marshal(event)
if err != nil {
log.Fatal(err)
}
producerMsg := &sarama.ProducerMessage{
Topic: s.outputTopic,
Value: bytes,  // you've already encoded the event 
}

否则,请务必查看所提供的使用CloudEvent客户机的示例代码https://github.com/cloudevents/sdk-go/blob/main/samples/kafka/sender/main.go

sender, err := kafka_sarama.NewSender([]string{"127.0.0.1:9092"}, saramaConfig, "test-topic")
if err != nil {
log.Fatalf("failed to create protocol: %s", err.Error())
}
defer sender.Close(context.Background())
c, err := cloudevents.NewClient(sender, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
if err != nil {
log.Fatalf("failed to create client, %v", err)
}
event := cloudevents.NewEvent() 
event.Set... 
c.Send(..., event)
... 

相关内容

  • 没有找到相关文章

最新更新