我正试图使用Kafka Connect JDBC接收器连接器将数据插入Oracle,但它引发了一个错误。我已经尝试了该模式的所有可能配置。以下是示例。
如果我缺少任何东西,请建议我以下是我的配置文件和错误。
案例1-首次配置
internal.value.converter.schemas.enable=false .
所以我得到
[2017-08-28 16:16:26,119] INFO Sink task WorkerSinkTask{id=oracle_sink-0} finished initialization and start (org.apache.kafka.connect.runtime.WorkerSinkTask:233)
[2017-08-28 16:16:26,606] INFO Discovered coordinator dfw-appblx097-01.prod.walmart.com:9092 (id: 2147483647 rack: null) for group connect-oracle_sink. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:597)
[2017-08-28 16:16:26,608] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-08-28 16:16:26,609] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-08-28 16:16:27,174] INFO Successfully joined group connect-oracle_sink with generation 26 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
[2017-08-28 16:16:27,176] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
[2017-08-28 16:16:28,580] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:455)
org.apache.kafka.connect.errors.ConnectException: No fields found using key and value schemas for table: DJ
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:190)
at io.confluent.connect.jdbc.sink.metadata.FieldsMetadata.extract(FieldsMetadata.java:58)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:65)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:62)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:66)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:435)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:251)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:180)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:148)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
第二种配置-
internal.key.converter.schemas.enable=true
internal.value.converter.schemas.enable=true
日志:
[2017-08-28 16:23:50,993] INFO Revoking previously assigned partitions [] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:419)
[2017-08-28 16:23:50,993] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-08-28 16:23:51,260] INFO (Re-)joining group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:432)
[2017-08-28 16:23:51,381] INFO Successfully joined group connect-oracle_sink with generation 29 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:399)
[2017-08-28 16:23:51,384] INFO Setting newly assigned partitions [DJ-7, DJ-6, DJ-5, DJ-4, DJ-3, DJ-2, DJ-1, DJ-0, DJ-9, DJ-8] for group connect-oracle_sink (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:262)
[2017-08-28 16:23:51,727] ERROR Task oracle_sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:308)
Oracle connector.properties看起来像
name=oracle_sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=DJ
connection.url=jdbc:oracle:thin:@hostname:port:sid
connection.user=username
connection.password=password
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
auto.create=true
auto.evolve=true
Connect-Standalone.properties
我的JSON看起来像-
{"Item":"12","Sourcing Reason":"corr","Postal Code":"l45","OrderNum":"10023","Intended Node Distance":1125.8,"Chosen Node":"34556","Quantity":1,"Order Date":1503808765201,"Intended Node":"001","Chosen Node Distance":315.8,"Sourcing Logic":"reducesplits"}
根据文档
接收器连接器需要了解模式,因此您应该使用合适的转换器,例如模式注册表附带的Avro转换器,或启用模式的JSON转换器。
因此,如果您的数据是JSON,您将拥有以下配置:
[...]
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
[...]
您在第二个实例中看到的错误是相关的——JsonConverter with schemas.enable requires "schema" and "payload" fields
——您共享的JSON不符合此要求的格式。
下面是一个带有schema
和payload
:的有效JSON消息的简单示例
{
"schema": {
"type": "struct",
"fields": [{
"type": "int32",
"optional": true,
"field": "c1"
}, {
"type": "string",
"optional": true,
"field": "c2"
}, {
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "create_ts"
}, {
"type": "int64",
"optional": false,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "update_ts"
}],
"optional": false,
"name": "foobar"
},
"payload": {
"c1": 10000,
"c2": "bar",
"create_ts": 1501834166000,
"update_ts": 1501834166000
}
}
您试图登录到Oracle的数据的来源是什么?如果是Kafka Connect入站,那么只需使用相同的converter
配置(Avro+Confluent Schema Registry(就会更容易、更高效。如果它是一个自定义应用程序,您需要让它(a(使用Confluent Avro serializer,或者(b(以上面所需的格式编写JSON,并在消息中提供有效负载的模式。
在阅读了这篇文章后,我也遇到了同样的问题。我已经用JDBC接收器MySQL解决了问题下面是我的Kafka Connect配置,作为附加信息:
curl --location --request POST 'http://localhost:8083/connectors/'
--header 'Accept: application/json'
--header 'Content-Type: application/json'
--data-raw '{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "ttib-transactions",
"connection.url": "jdbc:mysql://172.17.0.1:6603/tt-tran?verifyServerCertificate=true&useSSL=false",
"connection.user": "root",
"connection.password": "*******",
"value.converter.schema.registry.url": "https://psrc-j55zm.us-central1.gcp.confluent.cloud",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "false",
"insert.mode": "insert",
"batch.size":"0",
"table.name.format": "${topic}",
"pk.fields" :"id"
}
}'