Kafka Connect-JDBC接收器连接器-试图将AvroConverter Schema Registry的详细



我正在尝试将数据从Kafka集群加载到Oracle数据库中。集群中的数据已使用AvroConverter加载,架构详细信息还包含source、op、ts_ms等字段。我的问题是,当我试图从集群中读取数据时,它会尝试将这些架构字段作为附加列添加到我的目标表中,并引发错误:

INFO Unable to find fields [SinkRecordField{schema=Schema{Tab1.Value:STRUCT}, name='after', isPrimaryKey=false}, SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, SinkRecordField{schema=Schema{Tab1.Value:STRUCT}, name='before', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRING}, name='op', isPrimaryKey=false}, SinkRecordField{schema=Schema{STRUCT}, name='transaction', isPrimaryKey=false}, SinkRecordField{schema=Schema{INT64}, name='ts_ms', isPrimaryKey=false}] among column names [col1, col2, col3, col4] (io.confluent.connect.jdbc.sink.DbStructure)
[2022-04-10 17:18:04,181] ERROR WorkerSinkTask{id=jdbc-conn} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: Cannot ALTER TABLE "Tab1" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value (org.apache.kafka.connect.runtime.WorkerSinkTask)
io.confluent.connect.jdbc.sink.TableAlterOrCreateException: Cannot ALTER TABLE "Tab1" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.oracle.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value
at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:182)

我将在下面列出我的连接器详细信息(在docker容器中运行,为了清晰起见,删除了连接字符串(:

{
"name": "jdbc-conn",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"name": "jdbc-conn",
"tasks.max": "1",
"topics.regex": "customer.*",
"transforms": "changeTopicName",
"transforms.changeTopicName.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopicName.regex": "customer.(.*)",
"transforms.changeTopicName.replacement": "$1",
"connection.url": "jdbc:oracle:thin:user/pass@localhost:1521:customer",
"connection.user": "user",
"connection.password": "pass",
"insert.mode": "upsert",
"pk.mode": "record_key",
"pk.fields": ""
},
"tasks": [],
"type": "sink"
}

Docker集装箱

FROM confluentinc/cp-kafka-connect-base:latest
RUN   confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest

用初始化

docker run -d 
--name=connect 
--net=host 
-e CONNECT_BOOTSTRAP_SERVERS="localhost:9092" 
-e CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter" 
-e CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter" 
-e CONNECT_CONSUMER_BOOTSTRAP_SERVERS="localhost:9092" 
-e CONNECT_PRODUCER_BOOTSTRAP_SERVERS="localhost:9092" 
-e CONNECT_GROUP_ID="sink_group1" 
-e CONNECT_OFFSET_STORAGE_TOPIC="connect-sink-offsets" 
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR="3" 
-e CONNECT_OFFSET_STORAGE_PARTITIONS="3" 
-e CONNECT_CONFIG_STORAGE_TOPIC="connect-sink-configs" 
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR="3" 
-e CONNECT_STATUS_STORAGE_TOPIC="connect-sink-status" 
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR="3" 
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="xxx" 
-e CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL="yyy" 
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO="xxx" 
-e CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL="yyy" 
-e CONNECT_CONFLUENT_TOPIC_BOOTSTRAP_SERVERS="localhost:9092" 
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" 
-e CONNECT_LOG4J_ROOT_LOGLEVEL="INFO" 
-e CONNECT_PLUGIN_PATH=/usr/share/java,/etc/kafka-connect/jars,/usr/share/confluent-hub-components 
-v /home/cdcadmin/oracle_dest_connector/mounted/jars:/etc/kafka-connect/jars 
jdbc-conn:latest

我根本不想添加这些字段。我在这里错过了什么?

模式详细信息还包含source、op、ts_ms等字段

您需要安装并配置Debezium ExtractNewState转换来删除这些转换。

https://debezium.io/documentation/reference/stable/transformations/event-flattening.html

OneCricketer的答案和评论https://debezium.io/documentation/reference/stable/transformations/event-flattening.html

解决了这个问题。我缺少的是debezium核心正确安装。我希望ExtractNewRecordState是confluentinc/kafka-connect-jdbc的一部分,但事实并非如此。由于我需要坚持使用汇流集线器,我决定安装debezium/debezium连接器mysql(汇流集线器上没有可用的debezium-Oracle连接器(。我的Dockerfile现在是:

FROM confluentinc/cp-kafka-connect-base:latest
RUN   confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:latest
RUN   confluent-hub install --no-prompt debezium/debezium-connector-mysql:latest

日志最后显示:

INFO Added plugin 'io.debezium.transforms.ExtractNewRecordState' 

";无法找到字段";解决了错误。

现在我又打了一个:("错误:不支持的源数据类型:STRUCT";但现在情况不同了。再次感谢。

最新更新