ksql-CREATE TABLE即使填充了kafka主题,也会导致表中的值为null



使用ksqlDB,我创建了一个带有自定义查询的JDBC连接器。然后,根据由此产生的卡夫卡主题,我创建了一个表。但是,从表中进行选择只会为PRIMARY KEY返回数据,而为所有其他值返回null。我正在连接的postgres数据库的销售表不断更新新数据,我正试图使用ksql对这些数据进行流式传输。

ksql> CREATE SOURCE CONNECTOR con WITH (
'connector.class'      ='io.confluent.connect.jdbc.JdbcSourceConnector',
'connection.url'       = '....',
'topic.prefix'         = 'sales',
...
'key'                  = 'id',
'query'                = 'SELECT id, time, price FROM sales');
Message
Created connector CON
ksql> print sales limit 1;
Key format: HOPPING(KAFKA_STRING) or TUMBLING(KAFKA_STRING) or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 2020/11/30 09:07:55.109 Z, key: [123], value: {"schema":{"type":"struct","fields":[{"type":"string","optional":alse,"field":"id"},{"type":"int64","optional":true,"field":"time"},{"type":"float","optional":true,"field":"price"}],"optional":false},"payload":{"id":"123","time":1,"price":10.0}}
Topic printing ceased
ksql> CREATE TABLE sales_table (id VARCHAR PRIMARY KEY, time INT, price DOUBLE) WITH (kafka_topic='sales', partitions=1, value_format='JSON');
Message
Table created
ksql> SELECT * FROM sales_table EMIT CHANGES LIMIT 1;
+-----+-----+-----+
|ID   |TIME |PRICE|
+-----+-----+-----+
|123  |null |null |
Limit Reached
Query terminated

正如您所看到的,kafka主题在时间和价格字段中有具有适当值的条目。但是,当在该主题上创建表时,从表中进行选择会产生空的时间和价格字段。只有id(PRIMARY KEY列(才能正确打印。

知道为什么会发生这种事吗?

您在带有schemas.enable=true的连接器中使用org.apache.kafka.connect.json.JsonConverter转换器,因此您的架构不是(id VARCHAR PRIMARY KEY, time INT, price DOUBLE),因此您得到的值为NULL。

更好的方法是在源连接器中使用io.confluent.connect.avro.AvroConverter(或Protobuf,或JSON模式(,因为这样你甚至不必为CREATE STREAM键入模式,你只需要

CREATE TABLE sales_table  WITH (kafka_topic='sales', value_format='AVRO');

您这样指定替代转换器:

CREATE SOURCE CONNECTOR SOURCE_01 WITH (
…
'key.converter'= 'org.apache.kafka.connect.storage.StringConverter',
'value.converter'= 'io.confluent.connect.avro.AvroConverter',
'value.converter.schema.registry.url'= 'http://schema-registry:8081'
);

但是,如果必须使用JSON,请在源连接器中禁用模式:

CREATE SOURCE CONNECTOR SOURCE_01 WITH (
…
'value.converter.schemas.enable'= 'false'
);

参考编号:https://www.confluent.io/blog/kafka-connect-deep-dive-converters-serialization-explained

最新更新