如何正确地将JSON消息(使用kafka)发送到Jdbc-Sink(Sql服务器)



我对使用kafka有点陌生。我有一个简单的JSON消息,我正试图写入SQL服务器(使用Sink连接器(。这是我的简单JSON消息:

{
"DagId": "chat-bot-process-v1.0",
"RunId": "scheduled__2021-07-25T10:00:00+00:00",
"ChatKey": "82a4daf8-c1be-4524-bb80-ec252b38c020",
"ConversationId": "2158db2e-0bcc-48e6-a96e-3347e156a90a",
"EventTimestamp": null,
"EventType": "ASYNC",
"MessageType": "EPORTWEB",
"LastUpdateDatetime": "2022-09-21T17:05:51.473-04:00"
}

正如您所知,没有"schema"或"payload"结构例如

{
"schema": {
"type": "struct",
"fields": [
{
"type": "string",
"optional": true,
"field": "dagid"
},
...
"payload": {
"dagid": "live-person-process-v1.0",
}
}

然而,我正在阅读文档(https://rmoff.net/2021/03/12/kafka-connect-jdbc-sink-deep-dive-working-with-primary-keys/)它说您不能将纯JSON、CSV等与JDBC接收器连接器一起使用

所以我想我需要添加这个"schema"结构,并在"payload"中嵌套JSON。

我的问题是,最好的方法是什么?我已经通读了一些文档,上面说我可以使用Avro、JSONSchemaConverter或JSONConverter。我需要在我的java代码中添加这个吗?(你能为我的用例提供一个使用其中一个的例子吗?(

如果有帮助的话,我还可以添加我的接收器连接器配置或Java Producer配置。非常感谢。

我读过的文件,我认为适用:

  • https://www.baeldung.com/kafka-connectors-guide
  • https://rmoff.net/2021/03/12/kafka-connect-jdbc-sink-deep-dive-working-with-primary-keys/
  • https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html#connector
  • https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/
  • https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained/#non-json数据jsonconverter

实现的最佳方法

好吧,您可以将JSON值包装在payload对象中,然后添加schema。但这并不理想,因为这样会在主题中消耗更多的空间,因为每个事件都包含一个模式。

阅读说明我可以使用Avro、JSONSchemaConverter或JSONConverter 的文档

您也可以使用Protobuf。。。另外,前两个选项要求您运行Schema Registry服务器。但是,假设您想这样做,是的,您将依赖项添加到代码中,例如kafka-avro-serializer依赖项(或protobuf,或jsonschema(,然后相应地更改生产者配置中的value.serializer属性。

您可以参考Avro或Protobuf文档本身,了解如何设置您的项目,以从您的生产者可以使用

最新更新