序列化异常:反序列化 Avro 消息时出错



我在创建Kafka JdbcSinkConnector时出错(我的任务是将数据从Kafka主题传输到Postgres表(:

由以下原因引起:org.apache.kafka.common.errors.SerializationException: 反序列化 id -1 的 Avro 消息时出错

id -1 是什么意思?

连接器的设置:

{
"name": "MVM Test",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"topics": [
"mvm_test_events"
],
"connection.url": "jdbc:connection",
"connection.user": "user",
"connection.password": "*************"
}

我还描述了控制中心中(值(主题"mvm_test_events"的架构:

{
"type": "record",
"name": "event",
"namespace": "mvm",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "series_storage",
"type": "int"
},
{
"name": "type",
"type": "int"
},
{
"name": "entity_id",
"type": "int"
},
{
"name": "processing_ts",
"type": "double"
},
{
"name": "from_ts",
"type": "double"
},
{
"name": "to_ts",
"type": "string"
},
{
"name": "context",
"type": {
"type": "record",
"name": "context",
"fields": [
{
"name": "trainName",
"type": "string"
}
]
}
}
]
}

错误日志:

> [2020-01-22 06:45:10,380] ERROR Error encountered in task
> mvm-test-events-0. Executing stage 'VALUE_CONVERTER' with class
> 'io.confluent.connect.avro.AvroConverter', where consumed record is
> {topic='mvm_test_events', partition=0, offset=14,
> timestamp=1579615711794, timestampType=CreateTime}.
> (org.apache.kafka.connect.runtime.errors.LogReporter)
> org.apache.kafka.connect.errors.DataException: Failed to deserialize
> data for topic mvm_test_events to Avro:   at
> io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id -1 Caused by:
> org.apache.kafka.common.errors.SerializationException: Unknown magic
> byte! [2020-01-22 06:45:10,381] ERROR
> WorkerSinkTask{id=mvm-test-events-0} Task threw an uncaught and
> unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded
> in error handler  at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:464)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:320)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>   at
> org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)     at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748) Caused by:
> org.apache.kafka.connect.errors.DataException: Failed to deserialize
> data for topic mvm_test_events to Avro:   at
> io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:107)
>   at
> org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:487)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
>   at
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
>   ... 13 more Caused by:
> org.apache.kafka.common.errors.SerializationException: Error
> deserializing Avro message for id -1 Caused by:
> org.apache.kafka.common.errors.SerializationException: Unknown magic
> byte! [2020-01-22 06:45:10,381] ERROR
> WorkerSinkTask{id=mvm-test-events-0} Task is being killed and will not
> recover until manually restarted
> (org.apache.kafka.connect.runtime.WorkerTask)

据我所知,它尝试使用 io.confluent.connect.avro.AvroConverter 转换主题中的记录。现在我应该在连接器设置"值转换器类"中定义架构(我在主题设置中描述(名称吗?

你得到错误

org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
org.apache.kafka.common.errors.SerializationException: Unknown magic byte!

当您在 Kafka Connect 中使用 AvroConverter 读取尚未序列化为 Avro 的数据时。

因此,您要么需要修复生产者(并将数据正确序列化为 Avro(,要么如果您无意使用 Avro,请修复您的 Kafka Connect 连接器配置以使用适当的转换器。

有关详细信息,请参阅此文章。

编辑:根据您更新的问题,您打算以Avro的身份编写,因此使用AvroConverter是正确的。您尚未将其包含在连接器配置中,因此我假设它已经在您的 Kafka Connect 工作线程属性 ("value.converter": "io.confluent.connect.avro.AvroConverter"( 中设置。 不知何故,你有关于你的主题的数据不是Avro。我建议您设置一个死信队列,将这些消息路由到新主题进行检查,同时使您的接收器能够继续处理 Avro 的消息。

相关内容

  • 没有找到相关文章

最新更新