Kafka JDBCSinkConnector Schema 异常:带有 schemas.enable 的 JsonConverter 需要 "schema" 和 "payload"



我正在尝试使用JDBCSinkConnector将数据从Kafka主题传输到Postgres。在创建主题、创建流、创建带有配置的接收器连接器以及通过python-connect日志将数据生成到主题中等所有操作之后,返回以下结果:

Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration

这是JSON模式(sch.JSON(的代码:

{
"schema":{
"type": "struct",
"fields": [
{
"type":"int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field":"url"
],
"optional":false,
"name": "test_data"
},
"payload":{
"id": 12,
"url":"some_url"

}
}

这是kafka连接的代码:

curl -X PUT http://localhost:8083/connectors/sink-jdbc-postgre-01/config 
-H "Content-Type: application/json" -d '{
"connector.class"                    : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url"                     : "jdbc:postgresql://postgres:5432/",
"topics"                             : "test_topic06",
"key.converter"                      : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable"       : "true",
"value.converter"                    : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable"     : "true",  
"connection.user"                    : "postgres",
"connection.password"                : "*******",
"auto.create"                        : true,
"auto.evolve"                        : true,
"insert.mode"                        : "insert",
"pk.mode"                            : "record_key",
"pk.fields"                          : "MESSAGE_KEY"
}'

这是用于向Kafka生成数据的python代码:

from kafka import KafkaProducer
import json 
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
with open("../data/sch.json", 'r') as file:
read = file.read()
for i in range(1):
producer.send("test_topic06", value=read)
producer.close()

然后我尝试将"key.converter.schemas.enable""value.converter.schemas.enable"更改为false,但在日志中的结果完全相同。

完整日志:

2021-04-01 09:20:41,342] INFO MonitoringInterceptorConfig values: 
connect            |    confluent.monitoring.interceptor.publishMs = 15000
connect            |    confluent.monitoring.interceptor.topic = _confluent-monitoring
connect            |  (io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig)
connect            | [2021-04-01 09:20:41,344] INFO ProducerConfig values: 
connect            |    acks = -1
connect            |    batch.size = 16384
connect            |    bootstrap.servers = [broker:29092]
connect            |    buffer.memory = 33554432
connect            |    client.dns.lookup = default
connect            |    client.id = confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0
connect            |    compression.type = lz4
connect            |    connections.max.idle.ms = 540000
connect            |    delivery.timeout.ms = 120000
connect            |    enable.idempotence = false
connect            |    interceptor.classes = []
connect            |    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
connect            |    linger.ms = 500
connect            |    max.block.ms = 60000
connect            |    max.in.flight.requests.per.connection = 1
connect            |    max.request.size = 10485760
connect            |    metadata.max.age.ms = 300000
connect            |    metadata.max.idle.ms = 300000
connect            |    metric.reporters = []
connect            |    metrics.num.samples = 2
connect            |    metrics.recording.level = INFO
connect            |    metrics.sample.window.ms = 30000
connect            |    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
connect            |    receive.buffer.bytes = 32768
connect            |    reconnect.backoff.max.ms = 1000
connect            |    reconnect.backoff.ms = 50
connect            |    request.timeout.ms = 30000
connect            |    retries = 10
connect            |    retry.backoff.ms = 500
connect            |    sasl.client.callback.handler.class = null
connect            |    sasl.jaas.config = null
connect            |    sasl.kerberos.kinit.cmd = /usr/bin/kinit
connect            |    sasl.kerberos.min.time.before.relogin = 60000
connect            |    sasl.kerberos.service.name = null
connect            |    sasl.kerberos.ticket.renew.jitter = 0.05
connect            |    sasl.kerberos.ticket.renew.window.factor = 0.8
connect            |    sasl.login.callback.handler.class = null
connect            |    sasl.login.class = null
connect            |    sasl.login.refresh.buffer.seconds = 300
connect            |    sasl.login.refresh.min.period.seconds = 60
connect            |    sasl.login.refresh.window.factor = 0.8
connect            |    sasl.login.refresh.window.jitter = 0.05
connect            |    sasl.mechanism = GSSAPI
connect            |    security.protocol = PLAINTEXT
connect            |    security.providers = null
connect            |    send.buffer.bytes = 131072
connect            |    ssl.cipher.suites = null
connect            |    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
connect            |    ssl.endpoint.identification.algorithm = https
connect            |    ssl.key.password = null
connect            |    ssl.keymanager.algorithm = SunX509
connect            |    ssl.keystore.location = null
connect            |    ssl.keystore.password = null
connect            |    ssl.keystore.type = JKS
connect            |    ssl.protocol = TLS
connect            |    ssl.provider = null
connect            |    ssl.secure.random.implementation = null
connect            |    ssl.trustmanager.algorithm = PKIX
connect            |    ssl.truststore.location = null
connect            |    ssl.truststore.password = null
connect            |    ssl.truststore.type = JKS
connect            |    transaction.timeout.ms = 60000
connect            |    transactional.id = null
connect            |    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
connect            |  (org.apache.kafka.clients.producer.ProducerConfig)
connect            | [2021-04-01 09:20:41,349] INFO Kafka version: 5.5.0-ce (org.apache.kafka.common.utils.AppInfoParser)
connect            | [2021-04-01 09:20:41,349] INFO Kafka commitId: 6068e5d52c5e294e (org.apache.kafka.common.utils.AppInfoParser)
connect            | [2021-04-01 09:20:41,349] INFO Kafka startTimeMs: 1617268841349 (org.apache.kafka.common.utils.AppInfoParser)
connect            | [2021-04-01 09:20:41,349] INFO interceptor=confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0 created for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-04-01 09:20:41,361] ERROR WorkerSinkTask{id=sink-jdbc-postgre-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
connect            | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:492)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect            |    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect            |    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect            |    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect            |    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect            |    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect            |    at java.lang.Thread.run(Thread.java:748)
connect            | Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
connect            |    at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)
connect            |    at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect            |    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:492)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
connect            |    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
connect            |    ... 13 more
connect            | [2021-04-01 09:20:41,363] ERROR WorkerSinkTask{id=sink-jdbc-postgre-01-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect            | [2021-04-01 09:20:41,364] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
connect            | [2021-04-01 09:20:41,366] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0] Cluster ID: K4nfs8sOSWCoI2_jEFzZ1Q (org.apache.kafka.clients.Metadata)
connect            | [2021-04-01 09:20:41,370] INFO [Consumer clientId=connector-consumer-sink-jdbc-postgre-01-0, groupId=connect-sink-jdbc-postgre-01] Revoke previously assigned partitions test_topic06-2, test_topic06-0, test_topic06-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect            | [2021-04-01 09:20:41,370] INFO [Consumer clientId=connector-consumer-sink-jdbc-postgre-01-0, groupId=connect-sink-jdbc-postgre-01] Member connector-consumer-sink-jdbc-postgre-01-0-a6013ad5-a778-4372-a9ab-a0c77119150b sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect            | [2021-04-01 09:20:41,379] INFO Publish thread interrupted for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-04-01 09:20:41,396] INFO Publishing Monitoring Metrics stopped for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect            | [2021-04-01 09:20:41,397] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
connect            | [2021-04-01 09:20:41,403] INFO Closed monitoring interceptor for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)

您正在设置连接器以解析JSON密钥

"key.converter"                      : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable"       : "true",

但你没有按任何键

producer.send("test_topic06", value=read)

你能吗

  • key.converter设置为org.apache.kafka.connect.storage.StringConverter

  • 还将具有相同{schema: {}, payload:{}}结构的密钥与
producer.send("test_topic06", key=key_value, value=read)

相关内容

  • 没有找到相关文章

最新更新