如何将HDFS连接器添加到Kafka Connect API?



我是使用Kafka Connect功能的新手,在如何使用Kafka Connect配置Kafka和HDFS方面遇到困难。

我一直在关注Debezium网站上的教程,在那里我可以测试新事件并查看系统的工作原理。一旦进入教程,他们解释了我们如何在 MySql 和 Kafka 之间创建连接器,我尝试做同样的事情,但对于 HDFS。

我已经在网上完成了研究并启动了以下命令:

*curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{"name":"hdfs-sink","config":{"connector.class":"io.confluent.connect.hdfs.HdfsSinkConnector",
"tasks.max":1,
"topics":"dbserver1,dbserver1.inventory.products,dbserver1.inventory.products_on_hand,dbserver1.inventory.customers,dbserver1.inventory.orders",
"hdfs.url":"hdfs://172.18.0.2:9870",
"flush.size":3,
"logs.dir":"logs",
"topics.dir":"kafka",
"format.class":"io.confluent.connect.hdfs.parquet.ParquetFormat",
"partitioner.class":"io.confluent.connect.hdfs.partitioner.DefaultPartitioner",
"partition.field.name":"day"}}'*

在此命令中,我添加了由 Kafka 自动生成的主题,我尝试使用容器名称节点中的 IP 地址的 URL(我不确定这是否正确(。总的来说,我正在测试,但这里的最终目标是将每个事件都放入 HDFS。

{"error_code":500,"message":"Failed to find any class that implements Connector and which name matches io.confluent.connect.hdfs.HdfsSinkConnector, available connectors are: PluginDesc{klass=class io.confluent.connect.activemq.ActiveMQSourceConnector, name='io.confluent.connect.activemq.ActiveMQSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.elasticsearch.ElasticsearchSinkConnector, name='io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-elasticsearch/'}, PluginDesc{klass=class io.confluent.connect.gcs.GcsSinkConnector, name='io.confluent.connect.gcs.GcsSinkConnector', version='5.0.3', encodedVersion=5.0.3, type=sink, typeName='sink', location='file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-gcs/'}, PluginDesc{klass=class io.confluent.connect.ibm.mq.IbmMQSourceConnector, name='io.confluent.connect.ibm.mq.IbmMQSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-ibmmq/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSinkConnector, name='io.confluent.connect.jdbc.JdbcSinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jdbc.JdbcSourceConnector, name='io.confluent.connect.jdbc.JdbcSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-jdbc/'}, PluginDesc{klass=class io.confluent.connect.jms.JmsSourceConnector, name='io.confluent.connect.jms.JmsSourceConnector', version='5.3.1', encodedVersion=5.3.1, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-activemq/'}, PluginDesc{klass=class io.confluent.connect.s3.S3SinkConnector, name='io.confluent.connect.s3.S3SinkConnector', version='5.3.1', encodedVersion=5.3.1, type=sink, typeName='sink', location='file:/usr/share/java/kafka-connect-s3/'}, PluginDesc{klass=class io.confluent.connect.storage.tools.SchemaSourceConnector, name='io.confluent.connect.storage.tools.SchemaSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka-connect-s3/'}, PluginDesc{klass=class io.confluent.kafka.connect.datagen.DatagenConnector, name='io.confluent.kafka.connect.datagen.DatagenConnector', version='null', encodedVersion=null, type=source, typeName='source', location='file:/usr/share/confluent-hub-components/confluentinc-kafka-connect-datagen/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSinkConnector, name='org.apache.kafka.connect.file.FileStreamSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=sink, typeName='sink', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.file.FileStreamSourceConnector, name='org.apache.kafka.connect.file.FileStreamSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockConnector, name='org.apache.kafka.connect.tools.MockConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=connector, typeName='connector', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSinkConnector, name='org.apache.kafka.connect.tools.MockSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=sink, typeName='sink', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.MockSourceConnector, name='org.apache.kafka.connect.tools.MockSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.SchemaSourceConnector, name='org.apache.kafka.connect.tools.SchemaSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSinkConnector, name='org.apache.kafka.connect.tools.VerifiableSinkConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}, PluginDesc{klass=class org.apache.kafka.connect.tools.VerifiableSourceConnector, name='org.apache.kafka.connect.tools.VerifiableSourceConnector', version='5.3.1-ccs', encodedVersion=5.3.1-ccs, type=source, typeName='source', location='file:/usr/share/java/kafka/'}"}

这是终端给出的错误,我相信HDFS的插件没有正确安装(我在网上遵循了很多例子,但仍然不确定是否正确安装(。

我不确定是否真的有必要从 Confluent 中使用这个插件?

我不知道从码头工人安装的HDFS是否也是一个好主意?

希望您分享有关此问题的一些知识,提前感谢您。

教程链接:https://debezium.io/documentation/reference/1.0/tutorial.html

HDFS2 接收器连接器已弃用,并从 Confluent 平台安装中删除。

您仍然可以从Confluent Hub找到并安装它,我建议您使用官方Apache Kafka网站来了解Kafka Connect rahter的核心

欢迎来到 StackOverflow!

插件安装存在问题。所以首先,请使用Kafka Connect REST接口进行检查(详见此处(。然后,您可以手动安装连接器。

查看 Confluent 的 HDFS 2 接收器连接器暂存文档上的此链接: https://docs.confluent.io/current/connect/kafka-connect-hdfs/index.html

它涵盖了几乎所有内容,如果您使用的是Confluent平台,它还将帮助您如何解决。

docker 镜像提供了一种使用 HDFS 的可靠方式。使用此图像:https://hub.docker.com/r/sequenceiq/hadoop-docker/

最新更新