Kafka HDFS接收器连接器错误.[顶层类型必须为STRUCT]



我正在用Kafka connect测试Kafka的2.7版本,我面临着一个我不理解的问题。

我首先使用如下配置启动了分布式连接器。

bootstrap.servers=..:9092,...:9092, ...
group.id=kafka-connect-test
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
... some internal topic configuration
plugin.path=<plugin path>

此连接器使用8083端口。

我想在HDFS上用快速编解码器编写ORC格式的数据
所以我用REST API和json数据制作了新的Kafka HDFS连接器,如下所示。并且我不使用模式注册表

curl -X POST <connector url:8083> 
-H Accept: application/json 
-H Content-Type: application/json 
-d
{
"name": "hdfs-sinkconnect-test",
"config": {
"connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
"store.url": "hdfs:~",
"hadoop.conf.dir": "<my hadoop.conf dir>",
"hadoop.home": "<hadoop home dir>",
"tasks.max": "5",
"key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
"value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
"format.class": "io.confluent.connect.hdfs.orc.OrcFormat",
"flush.size": 1000,
"avro.codec": "snappy",
"topics": "<topic name>",
"topics.dir": "/tmp/connect-logs",
"partitioner.class": "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
"locale": "ko_KR",
"timezone": "Asia/Seoul",
"partition.duration.ms": "3600000",
"path.format": "'hour'=YYYYMMddHH/"
}
}

然后我有这样的错误消息。

# connectDistributed.out
[2021-06-28 17:14:11,596] ERROR Exception on topic partition <topic name>-<partition number>:  (io.confluent.connect.hdfs.TopicPartitionWriter:409)
org.apache.kafka.connect.errors.ConnectException: Top level type must be STRUCT but was bytes
at io.confluent.connect.hdfs.orc.OrcRecordWriterProvider$1.write(OrcRecordWriterProvider.java:98)
at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:742)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:385)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:333)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:126)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:329)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

我认为这个错误消息与模式信息有关。Schema注册表对于Kafka连接器来说是必不可少的吗
有什么想法或解决方案可以解决这个错误消息吗?谢谢

编写ORC文件需要Struct类型。

Confluent提供的选项包括纯JSON、JSONSchema、Avro或Protobuf。唯一不需要注册表的选项是普通的JsonConverter

请注意,key.deserializervalue.deserializer不是有效的Connect属性。您需要参考您的key.convertervalue.converter属性,而不是

如果你不愿意修改转换器,你可以尝试使用HoistField转换器来创建一个Struct,这将创建一个ORC文件,其模式只有一个字段

相关内容

最新更新