Kafka Conenct HDFS 接收器以镶木地板格式保存数据


使用 Kafka

Connect HDFS Sink,我能够将 avro 数据写入 Kafka 主题并将数据保存在 hive/hdfs 中。
我正在尝试使用格式类将数据保存为镶木地板文件格式

format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

快速入门-hdfs.属性如下

name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs_parquet
hdfs.url=hdfs://localhost:9000
flush.size=3
hive.metastore.uris=thrift://10.15.167.109:9083
hive.integration=true
schema.compatibility=BACKWARD
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
locale=en-us
timezone=UTC
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

当我将数据发布到 Kafka 时,在 hive 中创建表,test_hdfs_parquet在 hdfs 中创建目录,但由于以下异常,接收器无法以镶木地板格式保存数据

java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;
        at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:178)
        at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)
        at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)
        at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:124)
        at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:115)
        at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
        at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
        at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:68)
        at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:635)
        at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379)
        at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
        at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2018-03-13 11:48:41,148] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

您似乎遇到了此问题

提到的解决方案是使用 Avro 1.7.7 库,这意味着不仅parquet-avro,还意味着其他 avro jar。

或者你可以尝试从源代码编译hdfs-connect,并更新所有的jar。

相关内容

  • 没有找到相关文章

最新更新