我使用带有两个连接器的Kafka连接:
- debezium将数据从Postgres拉到Kafka
- 用于将数据从Kafka保存到S3的S3连接器
运行时,我从S3连接器收到此错误
java.lang.NullPointerException: Array contains a null element at 0
我发现了相关的消息,其中包含以下内容:
"some_key": [
"XCVB",
null
]
如何处理此消息?
我已经尝试将以下内容添加到S3连接器配置中:
"behavior.on.null.values": "ignore",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name":"dlq_s3_sink"
尝试跳过这些消息并将其发送到DLQ,但它似乎不起作用,并且由于此错误导致任务失败。我在日志中也看到了这一点:
Set parquet.avro.write-old-list-structure=false to turn on support for arrays with null elements.
但不确定我应该在哪里添加这个?作为连接器配置的一部分?
在接收器连接器配置中添加parquet.avro.write-old-list-structure:false
。
也可使用10.1.0或更高版本
参考:https://docs.confluent.io/kafka-connectors/s3-sink/current/changelog.html#version-10-1-0:~:text=S3%20客户端%20错误-,PR%2D485%20%2D%20CMSG%2D1531%3A%20支持%20null%20项目%20内部%20数组%20使用%20Parquet%20writer,-PR%2D475%20%2D/20CCMSG