我正在尝试使用JDBCSinkConnector将数据从Kafka主题传输到Postgres。在创建主题、创建流、创建带有配置的接收器连接器以及通过python-connect日志将数据生成到主题中等所有操作之后,返回以下结果:
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration
这是JSON模式(sch.JSON(的代码:
{
"schema":{
"type": "struct",
"fields": [
{
"type":"int32",
"optional": false,
"field": "id"
},
{
"type": "string",
"optional": false,
"field":"url"
],
"optional":false,
"name": "test_data"
},
"payload":{
"id": 12,
"url":"some_url"
}
}
这是kafka连接的代码:
curl -X PUT http://localhost:8083/connectors/sink-jdbc-postgre-01/config
-H "Content-Type: application/json" -d '{
"connector.class" : "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url" : "jdbc:postgresql://postgres:5432/",
"topics" : "test_topic06",
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : "true",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable" : "true",
"connection.user" : "postgres",
"connection.password" : "*******",
"auto.create" : true,
"auto.evolve" : true,
"insert.mode" : "insert",
"pk.mode" : "record_key",
"pk.fields" : "MESSAGE_KEY"
}'
这是用于向Kafka生成数据的python代码:
from kafka import KafkaProducer
import json
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
with open("../data/sch.json", 'r') as file:
read = file.read()
for i in range(1):
producer.send("test_topic06", value=read)
producer.close()
然后我尝试将"key.converter.schemas.enable"
和"value.converter.schemas.enable"
更改为false,但在日志中的结果完全相同。
完整日志:
2021-04-01 09:20:41,342] INFO MonitoringInterceptorConfig values:
connect | confluent.monitoring.interceptor.publishMs = 15000
connect | confluent.monitoring.interceptor.topic = _confluent-monitoring
connect | (io.confluent.monitoring.clients.interceptor.MonitoringInterceptorConfig)
connect | [2021-04-01 09:20:41,344] INFO ProducerConfig values:
connect | acks = -1
connect | batch.size = 16384
connect | bootstrap.servers = [broker:29092]
connect | buffer.memory = 33554432
connect | client.dns.lookup = default
connect | client.id = confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0
connect | compression.type = lz4
connect | connections.max.idle.ms = 540000
connect | delivery.timeout.ms = 120000
connect | enable.idempotence = false
connect | interceptor.classes = []
connect | key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
connect | linger.ms = 500
connect | max.block.ms = 60000
connect | max.in.flight.requests.per.connection = 1
connect | max.request.size = 10485760
connect | metadata.max.age.ms = 300000
connect | metadata.max.idle.ms = 300000
connect | metric.reporters = []
connect | metrics.num.samples = 2
connect | metrics.recording.level = INFO
connect | metrics.sample.window.ms = 30000
connect | partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
connect | receive.buffer.bytes = 32768
connect | reconnect.backoff.max.ms = 1000
connect | reconnect.backoff.ms = 50
connect | request.timeout.ms = 30000
connect | retries = 10
connect | retry.backoff.ms = 500
connect | sasl.client.callback.handler.class = null
connect | sasl.jaas.config = null
connect | sasl.kerberos.kinit.cmd = /usr/bin/kinit
connect | sasl.kerberos.min.time.before.relogin = 60000
connect | sasl.kerberos.service.name = null
connect | sasl.kerberos.ticket.renew.jitter = 0.05
connect | sasl.kerberos.ticket.renew.window.factor = 0.8
connect | sasl.login.callback.handler.class = null
connect | sasl.login.class = null
connect | sasl.login.refresh.buffer.seconds = 300
connect | sasl.login.refresh.min.period.seconds = 60
connect | sasl.login.refresh.window.factor = 0.8
connect | sasl.login.refresh.window.jitter = 0.05
connect | sasl.mechanism = GSSAPI
connect | security.protocol = PLAINTEXT
connect | security.providers = null
connect | send.buffer.bytes = 131072
connect | ssl.cipher.suites = null
connect | ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
connect | ssl.endpoint.identification.algorithm = https
connect | ssl.key.password = null
connect | ssl.keymanager.algorithm = SunX509
connect | ssl.keystore.location = null
connect | ssl.keystore.password = null
connect | ssl.keystore.type = JKS
connect | ssl.protocol = TLS
connect | ssl.provider = null
connect | ssl.secure.random.implementation = null
connect | ssl.trustmanager.algorithm = PKIX
connect | ssl.truststore.location = null
connect | ssl.truststore.password = null
connect | ssl.truststore.type = JKS
connect | transaction.timeout.ms = 60000
connect | transactional.id = null
connect | value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
connect | (org.apache.kafka.clients.producer.ProducerConfig)
connect | [2021-04-01 09:20:41,349] INFO Kafka version: 5.5.0-ce (org.apache.kafka.common.utils.AppInfoParser)
connect | [2021-04-01 09:20:41,349] INFO Kafka commitId: 6068e5d52c5e294e (org.apache.kafka.common.utils.AppInfoParser)
connect | [2021-04-01 09:20:41,349] INFO Kafka startTimeMs: 1617268841349 (org.apache.kafka.common.utils.AppInfoParser)
connect | [2021-04-01 09:20:41,349] INFO interceptor=confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0 created for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect | [2021-04-01 09:20:41,361] ERROR WorkerSinkTask{id=sink-jdbc-postgre-01-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
connect | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:492)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:469)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
connect | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
connect | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
connect | at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
connect | at java.util.concurrent.FutureTask.run(FutureTask.java:266)
connect | at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
connect | at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
connect | at java.lang.Thread.run(Thread.java:748)
connect | Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
connect | at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:359)
connect | at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
connect | at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:492)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
connect | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
connect | ... 13 more
connect | [2021-04-01 09:20:41,363] ERROR WorkerSinkTask{id=sink-jdbc-postgre-01-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
connect | [2021-04-01 09:20:41,364] INFO Stopping task (io.confluent.connect.jdbc.sink.JdbcSinkTask)
connect | [2021-04-01 09:20:41,366] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0] Cluster ID: K4nfs8sOSWCoI2_jEFzZ1Q (org.apache.kafka.clients.Metadata)
connect | [2021-04-01 09:20:41,370] INFO [Consumer clientId=connector-consumer-sink-jdbc-postgre-01-0, groupId=connect-sink-jdbc-postgre-01] Revoke previously assigned partitions test_topic06-2, test_topic06-0, test_topic06-1 (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
connect | [2021-04-01 09:20:41,370] INFO [Consumer clientId=connector-consumer-sink-jdbc-postgre-01-0, groupId=connect-sink-jdbc-postgre-01] Member connector-consumer-sink-jdbc-postgre-01-0-a6013ad5-a778-4372-a9ab-a0c77119150b sending LeaveGroup request to coordinator broker:29092 (id: 2147483646 rack: null) due to the consumer is being closed (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
connect | [2021-04-01 09:20:41,379] INFO Publish thread interrupted for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect | [2021-04-01 09:20:41,396] INFO Publishing Monitoring Metrics stopped for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
connect | [2021-04-01 09:20:41,397] INFO [Producer clientId=confluent.monitoring.interceptor.connector-consumer-sink-jdbc-postgre-01-0] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)
connect | [2021-04-01 09:20:41,403] INFO Closed monitoring interceptor for client_id=connector-consumer-sink-jdbc-postgre-01-0 client_type=CONSUMER session= cluster=K4nfs8sOSWCoI2_jEFzZ1Q group=connect-sink-jdbc-postgre-01 (io.confluent.monitoring.clients.interceptor.MonitoringInterceptor)
您正在设置连接器以解析JSON
密钥
"key.converter" : "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable" : "true",
但你没有按任何键
producer.send("test_topic06", value=read)
你能吗
- 将
key.converter
设置为org.apache.kafka.connect.storage.StringConverter
或
- 还将具有相同
{schema: {}, payload:{}}
结构的密钥与
producer.send("test_topic06", key=key_value, value=read)