我已经安装了docker confluentinc/cp-kafka-connect:4.0.0图像在我的虚拟机中。我有兴趣获得hdfs中的纯文本数据(字符串格式(的Kafka主题。
sudo docker run -d
--name=kafka-connect
--net=host
-e CONNECT_BOOTSTRAP_SERVERS=<kafka-server-host>:9092
-e CONNECT_REST_PORT=8082
-e CONNECT_GROUP_ID="connect-kafkac1"
-e CONNECT_CONFIG_STORAGE_TOPIC="connect-kafkac1-config"
-e CONNECT_OFFSET_STORAGE_TOPIC="connect-kafkac1-offsets"
-e CONNECT_STATUS_STORAGE_TOPIC="connect-kafkac1-status"
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.json.JsonConverter"
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost"
-e CONNECT_LOG4J_LOGGERS=org.reflections=ERROR
-e CONNECT_PLUGIN_PATH=/usr/share/java
confluentinc/cp-kafka-connect:4.0.0
i已完成以下配置。
/etc/kafka/connect-andanecties.properties
bootstrap.servers=kafkaclusteraddress:9092
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
/etc/kafka-connect-hdfs/quickstart-hdfs.properties
name=hdfs-sink
connector.class=io.confluent.connect.hdfs.HdfsSinkConnector
tasks.max=1
topics=<topicname>
topics.dir=<hdfs-topic-dir>
logs.dir=<hdfs-logs-dir>
hdfs.url=<hdfs-url>:8020
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat
flush.size=3
hadoop.conf.dir=/etc/hadoop/conf
hadoop.home=/usr/bin/hadoop
错误我在独立模式下运行配置后收到的错误
# connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-hdfs/quickstart-hdfs.properties
----------------------------------------
[2018-02-13 19:11:09,542] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
java.lang.IllegalArgumentException: Avro schema must be a record.
at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:113)
at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:68)
at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:635)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
[2018-02-13 19:11:09,545] ERROR WorkerSinkTask{id=hdfs-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: Avro schema must be a record.
at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:113)
at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
at io.confluent.connect.hdfs.parquet.ParquetRecordWriterProvider$1.write(ParquetRecordWriterProvider.java:68)
at io.confluent.connect.hdfs.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:635)
at io.confluent.connect.hdfs.TopicPartitionWriter.write(TopicPartitionWriter.java:379)
at io.confluent.connect.hdfs.DataWriter.write(DataWriter.java:374)
at io.confluent.connect.hdfs.HdfsSinkTask.put(HdfsSinkTask.java:101)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
... 10 more
[2018-02-13 19:11:09,545] ERROR WorkerSinkTask{id=hdfs-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
如果您查看stacktrace,则期望AVRO数据而不是字符串。
IllegalArgumentException: Avro schema must be a record.
at org.apache.parquet.avro.AvroSchemaConverter.convert(AvroSchemaConverter.java:113)
at org.apache.parquet.avro.AvroParquetWriter.writeSupport(AvroParquetWriter.java:144)
at org.apache.parquet.avro.AvroParquetWriter.<init>(AvroParquetWriter.java:106)
我没有亲自测试JSON到Parquet,但是作为柱状数据类型,ParquetFormat
要求您具有模式。由于您有schemas.enable=false
,因此不会发生这种情况,因此它不能消耗普通字符串,INT,布尔值等和其他"非结构化"连接API类型。
这些是需要更改的设置。
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
为了将jsonConverter设置为具有schemas.enable=true
,然后您必须产生看起来像{"schema": {...}, "payload": {...}}
的数据,其中schema
字段包含payload
字段中对象的类型定义
否则,如果您可以控制生产者代码,则应将AVRO数据发送,这需要使用架构注册表。
我想提到,由于您只有一个字符串记录,因此Parquet不会比HDF上的原始文本文件有太大好处,因为无论如何都只有一个列。