使用 Kafka
Connect HDFS Sink,我能够将 avro 数据写入 Kafka 主题并将数据保存在 hive/hdfs 中。
我正在尝试使用格式类将数据保存为镶木地板文件格式
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
快速入门-hdfs.属性如下
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=test_hdfs_parquet
hdfs.url=hdfs://localhost:9000
flush.size=3
hive.metastore.uris=thrift://10.15.167.109:9083
hive.integration=true
schema.compatibility=BACKWARD
partitioner.class=io.confluent.connect.hdfs.partitioner.HourlyPartitioner
locale=en-us
timezone=UTC
key.converter.schemas.enable=false
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
当我将数据发布到 Kafka 时,在 hive 中创建表,test_hdfs_parquet在 hdfs 中创建目录,但由于以下异常,接收器无法以镶木地板格式保存数据
java.lang.NoSuchMethodError: org.apache.avro.Schema.getLogicalType()Lorg/apache/avro/LogicalType;
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:178)
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:130)
at org.apache.parquet.avro.AvroSchemaConverter.convertField(AvroSchemaConverter.java:227)
at org.apache.parquet.avro.AvroSchemaConverter.convertFields(AvroSchemaConverter.java:124)
at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:115)
at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:68)
at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:635)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2018-03-13 11:48:41,148] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
您似乎遇到了此问题
提到的解决方案是使用 Avro 1.7.7 库,这意味着不仅parquet-avro
,还意味着其他 avro jar。
或者你可以尝试从源代码编译hdfs-connect,并更新所有的jar。