kafka-connect JdbcSourceConnector反序列化问题



我使用kafka connect连接到数据库,以便存储压缩主题的信息,并且在尝试在spring云流应用程序中使用该主题时遇到反序列化问题。

连接器配置:

{
"name": "my-connector",
"config": {
"name": "my-connector",
"poll.interval.ms": "86400000",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "oracle-jdbc-string",
"connection.user": "testid",
"connection.password": "test",
"catalog.pattern": "mySchema",
"table.whitelist": "MY_TABLE",
"table.types": "TABLE",
"mode": "bulk",
"numeric.mapping": "best_fit",
"transforms": "createKey, extractCode, UpdateTopicName",
"transforms.UpdateTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.extractCode.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractCode.field": "ID",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "ID",
"transforms.UpdateTopicName.regex": "(.*)",
"transforms.UpdateTopicName.replacement": "my_topic",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"topic.prefix": "nt_mny_"
}
}

连接器似乎工作正常,并将适当的消息放在主题上,当使用kafka控制台消费者时,示例消息如下所示

kafka-console-consumer --bootstrap-server localhost.ntrs.com:9092 --topic nt_mny_ece_alert_avro --from-beginning  --property print.key=true | jq '.'
7247
0
{
"ID": 7247,
"USER_SK": 5623,
"TYP_CDE": "TEST",
"ALRT_ACTIVE_FLAG": "Y",
"ALRT_DESC": "My Alert",
"ALRT_STATUS": "VISIBLE",
"CREAT_BY": "ME",
"CREAT_TM": 1593547299565,
"UPD_BY": "ME",
"UPD_TM": 1593547299565
}

我想知道在键和值之间打印的0是问题所在,还是只是卡夫卡噪音。

我在代码中看到的问题是

org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7); nested exception is java.io.CharConversionException: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7)

并且我的处理器/接收器代码相对简单。

@StreamListener
public void process(
@Input(MyAlertsProcessor.MY_ALERT_AVRO) KStream<String, Json> myAlertKconnectStream) {
myAlertKconnectStream.peek((key,value) -> {
System.out.println("HELOOOOOO");
logger.debug("ece/pre: key={}, value={}",key,value);});
}

我花了好几天的时间试图弄清楚这一点,但几乎没有表现出来,任何帮助都是感激的!

您使用的是JSON模式转换器(io.confluent.connect.json.JsonSchemaConverter(,而不是JSON转换器(org.apache.kafka.connect.json.JsonConverter(。

JSON模式转换器使用模式注册表来存储模式,并将有关它的信息放在消息的前几个字节上。这就是导致代码(Could not read JSON: Invalid UTF-32 character 0x17a2241 (above 0x0010ffff) at char #1, byte #7)(出错的原因。

因此,要么在代码中使用JSONSchema取消序列化程序(更好(,要么切换到使用org.apache.kafka.connect.json.JsonConverter转换器(不太好;然后丢弃模式(。

更多详细信息:https://rmoff.net/2020/07/03/why-json-isnt-the-same-as-json-schema-in-kafka-connect-converters-and-ksqldb-viewing-kafka-messages-bytes-as-hex/

最新更新