在Kafka-connectS3连接器中处理数组内为Null的消息



我使用带有两个连接器的Kafka连接:

  1. debezium将数据从Postgres拉到Kafka
  2. 用于将数据从Kafka保存到S3的S3连接器

运行时,我从S3连接器收到此错误

java.lang.NullPointerException: Array contains a null element at 0 

我发现了相关的消息,其中包含以下内容:

"some_key": [
"XCVB",
null
]

如何处理此消息?

我已经尝试将以下内容添加到S3连接器配置中:

"behavior.on.null.values": "ignore",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name":"dlq_s3_sink"

尝试跳过这些消息并将其发送到DLQ,但它似乎不起作用,并且由于此错误导致任务失败。我在日志中也看到了这一点:

Set parquet.avro.write-old-list-structure=false to turn on support for arrays with null elements.

但不确定我应该在哪里添加这个?作为连接器配置的一部分?

在接收器连接器配置中添加parquet.avro.write-old-list-structure:false

也可使用10.1.0或更高版本
参考:https://docs.confluent.io/kafka-connectors/s3-sink/current/changelog.html#version-10-1-0:~:text=S3%20客户端%20错误-,PR%2D485%20%2D%20CMSG%2D1531%3A%20支持%20null%20项目%20内部%20数组%20使用%20Parquet%20writer,-PR%2D475%20%2D/20CCMSG

相关内容

  • 没有找到相关文章

最新更新