我在创建Kafka JdbcSinkConnector时出错(我的任务是将数据从Kafka主题传输到Postgres表(:
由以下原因引起:org.apache.kafka.common.errors.SerializationException: 反序列化 id -1 的 Avro 消息时出错
id -1 是什么意思?
连接器的设置:
{
"name": "MVM Test",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": [
"mvm_test_events"
],
"connection.url": "jdbc:connection",
"connection.user": "user",
"connection.password": "*************"
}
我还描述了控制中心中(值(主题"mvm_test_events"的架构:
{
"type": "record",
"name": "event",
"namespace": "mvm",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "series_storage",
"type": "int"
},
{
"name": "type",
"type": "int"
},
{
"name": "entity_id",
"type": "int"
},
{
"name": "processing_ts",
"type": "double"
},
{
"name": "from_ts",
"type": "double"
},
{
"name": "to_ts",
"type": "string"
},
{
"name": "context",
"type": {
"type": "record",
"name": "context",
"fields": [
{
"name": "trainName",
"type": "string"
}
]
}
}
]
}
错误日志:
> [2020-01-22 06:45:10,380] ERROR Error encountered in task
> mvm-test-events-0. Executing stage 'VALUE_CONVERTER' with class
> 'io.confluent.connect.avro.AvroConverter', where consumed record is
> {topic='mvm_test_events', partition=0, offset=14,
> timestamp=1579615711794, timestampType=CreateTime}.
> (org.apache.kafka.connect.runtime.errors.LogReporter)
> org.apache.kafka.connect.errors.DataException: Failed to deserialize
> data for topic mvm_test_events to Avro: at
> io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id -1 Caused by:
> org.apache.kafka.common.errors.SerializationException: Unknown magic
> byte! [2020-01-22 06:45:10,381] ERROR
> WorkerSinkTask{id=mvm-test-events-0} Task threw an uncaught and
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded
> in error handler at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
> at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
> at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.kafka.connect.errors.DataException: Failed to deserialize
> data for topic mvm_test_events to Avro: at
> io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
> at
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
> at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
> ... 13 more Caused by:
> org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id -1 Caused by:
> org.apache.kafka.common.errors.SerializationException: Unknown magic
> byte! [2020-01-22 06:45:10,381] ERROR
> WorkerSinkTask{id=mvm-test-events-0} Task is being killed and will not
> recover until manually restarted
> (org.apache.kafka.connect.runtime.WorkerTask)
据我所知,它尝试使用 io.confluent.connect.avro.AvroConverter 转换主题中的记录。现在我应该在连接器设置"值转换器类"中定义架构(我在主题设置中描述(名称吗?
你得到错误
org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
当您在 Kafka Connect 中使用 AvroConverter 读取尚未序列化为 Avro 的数据时。
因此,您要么需要修复生产者(并将数据正确序列化为 Avro(,要么如果您无意使用 Avro,请修复您的 Kafka Connect 连接器配置以使用适当的转换器。
有关详细信息,请参阅此文章。
编辑:根据您更新的问题,您打算以Avro的身份编写,因此使用AvroConverter是正确的。您尚未将其包含在连接器配置中,因此我假设它已经在您的 Kafka Connect 工作线程属性 ("value.converter": "io.confluent.connect.avro.AvroConverter"
( 中设置。 不知何故,你有关于你的主题的数据不是Avro。我建议您设置一个死信队列,将这些消息路由到新主题进行检查,同时使您的接收器能够继续处理 Avro 的消息。