Docker容器中的Kafka Connect:未添加连接器



我正在尝试运行一个简单的Kafka Connect容器。我确实尝试了Confluent Connect教程,但设置略有不同(没有docker机器,没有架构注册表(。

目前,我正在使用一个包含Zookeeper和Kafka的Docker compose设置。

version: '3.1'
services:
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- 2181
environment:
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
- ZOOKEEPER_SYNC_LIMIT=2
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 9092
- 9094:9094
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
# setup :9092 for access inside the docker network, 9094 for outside (ie host)
- KAFKA_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://kafka:9094
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://localhost:9094
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_NUM_PARTITIONS=10

这适用于不同的用途,所以我不认为这会成为问题。

现在我正在启动一个KafkaConnect容器,它可以很好地连接到Kafka。我使用以下命令,该命令改编自Connect教程:

docker run -d 
--name=kafka-connect-test 
--net=kafka-connect_default 
--expose 28083 
-p 28083:28083 
-e CONNECT_BOOTSTRAP_SERVERS=kafka:9092 
-e CONNECT_REST_PORT=28083 
-e CONNECT_GROUP_ID="quickstart-test" 
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-test-config" 
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-test-offsets" 
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-test-status" 
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1 
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1 
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1 
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" 
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" 
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" 
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" 
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost" 
-e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG 
-e CONNECT_PLUGIN_PATH=/usr/share/java/kafka,/etc/kafka-connect/jars 
-v /tmp/quickstart/file:/tmp/quickstart 
-v /tmp/quickstart/jars:/etc/kafka-connect/jars 
confluentinc/cp-kafka-connect:latest

最显著的区别是我使用的是StringConverter,因为我想使用kafkacat插入测试数据。

容器启动良好,正在运行,并且可以在我尝试的所有公开端点上访问。由于我没有添加任何连接器,我查询可用的连接器:

localhost:28083/connector-plugins:

[
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "5.4.0-ccs"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "5.4.0-ccs"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]

因此,就目前而言,我只需要创建一个文件接收器,将数据从主题写入文件就足够了。I POST到localhost:28083/connectors

{ "name": "file-sink", 
"config": { 
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": 1, 
"file": "/test.sink.txt",
"topics": "test-topic"
}
}

并接收CCD_ 5。

但是,当使用GET查询该端点时,我会得到一个空数组作为响应。尝试一下,我还可以将connector.class更改为FileStreamSinkConnector或仅更改为FileStreamSink,并且仍然会得到201(不添加连接器(。

我做错了什么?

为什么当事情明显出了问题时,我会得到"成功"的回应?

问题是:

-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter" 
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter" 

内部转换器只是内部的,并且自从Apache Kafka 2.0版本以来,就被弃用了。如果你在创建连接器后查看Kafka Connect工作日志,你会看到:

ERROR Found configuration for connector 'connector-file-sink' in wrong format: class java.lang.String (org.apache.kafka.connect.storage.KafkaConfigBackingStore)

这是因为Kafka Connect使用Kafka本身作为状态存储,当您创建连接器时,它会将其存储在Kafka主题(CONNECT_CONFIG_STORAGE_TOPIC(上。这默认为JSON,看起来Kafka Connect不喜欢更改它(事实上,没有理由更改它(。

如果您运行与以前相同的Docker命令,但没有两个CONNECT_INTERNAL_转换器行,您会发现一切都很好。

这是正在创建的连接器(我使用PUT而不是POST,因为它是幂等的,更容易重新运行(:

curl -i -X PUT -H  "Content-Type:application/json" 
http://localhost:28083/connectors/file-sink/config 
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": 1,
"file": "/test.sink.txt",
"topics": "test-topic"
}'
HTTP/1.1 201 Created
Date: Wed, 11 Mar 2020 09:16:04 GMT
Location: http://localhost:28083/connectors/file-sink
Content-Type: application/json
Content-Length: 211
Server: Jetty(9.4.20.v20190813)
{"name":"file-sink","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","tasks.max":"1","file":"/test.sink.txt","topics":"test-topic","name":"file-sink"},"tasks":[],"type":"sink"}%

现在检查它是否正在运行(用一些bash的东西来很好地重新格式化它(:

curl -s "http://localhost:28083/connectors?expand=info&expand=status" | 
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | 
column -s : -t| sed 's/"//g'| sort
sink  |  file-sink  |  RUNNING  |  RUNNING  | org.apache.kafka.connect.file.FileStreamSinkConnector

将一些数据发送到主题:

➜ kafkacat -b localhost:9094 -t test-topic -P -K:
1:foo
2:bar

观察Kafka Connect写入的文件中的数据:

➜ docker exec -t kafka-connect-test bash -c 'tail -f /test.sink.txt'
foo
bar

BTW关于:

最显著的区别是我使用StringConverter,因为我想使用kafkacat插入测试数据。

请注意,您可以将每个连接器的转换器设置为配置的一部分;在worker(即全局(级别设置StringConverter可能不是一个好主意,因为您很少使用它,当然是用于值。


有关Kafka Connect的更多信息,请查看:

  • 用Kafka Connect从零到英雄
  • Kafka Connect Deep Dive——转换器和序列化解释

相关内容

  • 没有找到相关文章

最新更新