kafka jdbc sink 连接器抛出 org.apache.kafka.connect.errors.DataException (结构架构的字段名称未正确指定)以插入 PG 表



我使用kafka-jdbc-sink-connector为我的项目,我需要在kafka主题(kafka_subs)发布一些JSON,然后通过使用jdbc-sink-connector,我需要在模式(TESTDB)下的postgres表(subs)插入该记录。但是我得到了下面的例外:

Kafka连接器版本是,confluentinc/cp-kafka-connect:最新

和我运行Kafka连接器从docker组成,

下面是jdbc接收器连接器配置,

curl -X POST http://localhost:8082/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_sink_postgres_022",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:postgresql://localhost:5432/postgres",
"connection.user": "postgres",
"connection.password": "postgres",
"topics": "kafka_subs",
"auto.create": "true",
"insert.mode":"insert",
"table.name.format": "TESTDB.subs",
"mode":"bulk",
"pk.mode":"none",
"poll.interval.ms": 60000,
"pk.mofr":"bulk",
"value.converter.schema.enable":"true",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter": "org.apache.kafka.connect.json.JsonConverter ",
"schemas.enable":"true"
}
}'

这是我在kafka主题上发布的记录,使用。/bin/kafka-console-producer.sh,

{
"schema": {
"type": "struct",
"name": "TESTDB",
"optional": false,
"fields": [{
"name": "sub_id",
"optional": false,
"type": "string"
}, {
"name": "sub_name",
"optional": false,
"type": "string"
}
]
},
"payload": {
"sub_id": "10000",
"sub_name": "Sssss"
}
}

下面是例外,

[2021-09-09 16:18:36,705] ERROR WorkerSinkTask{id=jdbc_sink_postgres_022-0} Error converting message value in topic 'kafka_subs' partition 0 at offset 0 and timestamp 1631204315678: Struct schema's field name not specified properly (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.DataException: Struct schema's field name not specified properly
at org.apache.kafka.connect.json.JsonConverter.asConnectSchema(JsonConverter.java:534)
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:382)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:545)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:501)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
[2021-09-09 16:18:36,707] ERROR WorkerSinkTask{id=jdbc_sink_postgres_022-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:501)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:478)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:189)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:238)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)

我做错了什么吗?或者如何在postgres表中插入这个简单的json ?

谢谢,

这个问题现在为我解决了,因为我已经更改了"name"在"fields"到"field",现在有了这个更改,它工作得很好,

前一个json是:

{
"schema": {
"type": "struct",
"name": "TESTDB",
"optional": false,
"fields": [{
"name": "sub_id",
"optional": false,
"type": "string"
}, {
"name": "sub_name",
"optional": false,
"type": "string"
}
]
},
"payload": {
"sub_id": "10000",
"sub_name": "Sssss"
}
}

更改后的json是:

{
"schema": {
"type": "struct",
"name": "TESTDB",
"optional": false,
"fields": [{
"field": "sub_id",
"optional": false,
"type": "string"
}, {
"field": "sub_name",
"optional": false,
"type": "string"
}
]
},
"payload": {
"sub_id": "10001",
"sub_name": "Ggggg"
}
}

相关内容

  • 没有找到相关文章