我正在尝试运行一个简单的Kafka Connect容器。我确实尝试了Confluent Connect教程,但设置略有不同(没有docker机器,没有架构注册表(。
目前,我正在使用一个包含Zookeeper和Kafka的Docker compose设置。
version: '3.1'
services:
zookeeper:
image: confluentinc/cp-zookeeper
ports:
- 2181
environment:
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
- ZOOKEEPER_SYNC_LIMIT=2
kafka:
image: confluentinc/cp-kafka
depends_on:
- zookeeper
ports:
- 9092
- 9094:9094
environment:
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
# setup :9092 for access inside the docker network, 9094 for outside (ie host)
- KAFKA_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://kafka:9094
- KAFKA_ADVERTISED_LISTENERS=INTERNAL://kafka:9092,OUTSIDE://localhost:9094
- KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
- KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1
- KAFKA_NUM_PARTITIONS=10
这适用于不同的用途,所以我不认为这会成为问题。
现在我正在启动一个KafkaConnect容器,它可以很好地连接到Kafka。我使用以下命令,该命令改编自Connect教程:
docker run -d
--name=kafka-connect-test
--net=kafka-connect_default
--expose 28083
-p 28083:28083
-e CONNECT_BOOTSTRAP_SERVERS=kafka:9092
-e CONNECT_REST_PORT=28083
-e CONNECT_GROUP_ID="quickstart-test"
-e CONNECT_CONFIG_STORAGE_TOPIC="quickstart-test-config"
-e CONNECT_OFFSET_STORAGE_TOPIC="quickstart-test-offsets"
-e CONNECT_STATUS_STORAGE_TOPIC="quickstart-test-status"
-e CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR=1
-e CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR=1
-e CONNECT_STATUS_STORAGE_REPLICATION_FACTOR=1
-e CONNECT_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter"
-e CONNECT_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter"
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter"
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter"
-e CONNECT_REST_ADVERTISED_HOST_NAME="localhost"
-e CONNECT_LOG4J_ROOT_LOGLEVEL=DEBUG
-e CONNECT_PLUGIN_PATH=/usr/share/java/kafka,/etc/kafka-connect/jars
-v /tmp/quickstart/file:/tmp/quickstart
-v /tmp/quickstart/jars:/etc/kafka-connect/jars
confluentinc/cp-kafka-connect:latest
最显著的区别是我使用的是StringConverter
,因为我想使用kafkacat
插入测试数据。
容器启动良好,正在运行,并且可以在我尝试的所有公开端点上访问。由于我没有添加任何连接器,我查询可用的连接器:
localhost:28083/connector-plugins
:
[
{
"class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"type": "sink",
"version": "5.4.0-ccs"
},
{
"class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
"type": "source",
"version": "5.4.0-ccs"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type": "source",
"version": "1"
},
{
"class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type": "source",
"version": "1"
}
]
因此,就目前而言,我只需要创建一个文件接收器,将数据从主题写入文件就足够了。I POST到localhost:28083/connectors
{ "name": "file-sink",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": 1,
"file": "/test.sink.txt",
"topics": "test-topic"
}
}
并接收CCD_ 5。
但是,当使用GET
查询该端点时,我会得到一个空数组作为响应。尝试一下,我还可以将connector.class
更改为FileStreamSinkConnector
或仅更改为FileStreamSink
,并且仍然会得到201
(不添加连接器(。
我做错了什么?
为什么当事情明显出了问题时,我会得到"成功"的回应?
问题是:
-e CONNECT_INTERNAL_KEY_CONVERTER="org.apache.kafka.connect.storage.StringConverter"
-e CONNECT_INTERNAL_VALUE_CONVERTER="org.apache.kafka.connect.storage.StringConverter"
内部转换器只是内部的,并且自从Apache Kafka 2.0版本以来,就被弃用了。如果你在创建连接器后查看Kafka Connect工作日志,你会看到:
ERROR Found configuration for connector 'connector-file-sink' in wrong format: class java.lang.String (org.apache.kafka.connect.storage.KafkaConfigBackingStore)
这是因为Kafka Connect使用Kafka本身作为状态存储,当您创建连接器时,它会将其存储在Kafka主题(CONNECT_CONFIG_STORAGE_TOPIC
(上。这默认为JSON,看起来Kafka Connect不喜欢更改它(事实上,没有理由更改它(。
如果您运行与以前相同的Docker命令,但没有两个CONNECT_INTERNAL_
转换器行,您会发现一切都很好。
这是正在创建的连接器(我使用PUT
而不是POST
,因为它是幂等的,更容易重新运行(:
curl -i -X PUT -H "Content-Type:application/json"
http://localhost:28083/connectors/file-sink/config
-d '{
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"tasks.max": 1,
"file": "/test.sink.txt",
"topics": "test-topic"
}'
HTTP/1.1 201 Created
Date: Wed, 11 Mar 2020 09:16:04 GMT
Location: http://localhost:28083/connectors/file-sink
Content-Type: application/json
Content-Length: 211
Server: Jetty(9.4.20.v20190813)
{"name":"file-sink","config":{"connector.class":"org.apache.kafka.connect.file.FileStreamSinkConnector","tasks.max":"1","file":"/test.sink.txt","topics":"test-topic","name":"file-sink"},"tasks":[],"type":"sink"}%
现在检查它是否正在运行(用一些bash的东西来很好地重新格式化它(:
curl -s "http://localhost:28083/connectors?expand=info&expand=status" |
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' |
column -s : -t| sed 's/"//g'| sort
sink | file-sink | RUNNING | RUNNING | org.apache.kafka.connect.file.FileStreamSinkConnector
将一些数据发送到主题:
➜ kafkacat -b localhost:9094 -t test-topic -P -K:
1:foo
2:bar
观察Kafka Connect写入的文件中的数据:
➜ docker exec -t kafka-connect-test bash -c 'tail -f /test.sink.txt'
foo
bar
BTW关于:
最显著的区别是我使用
StringConverter
,因为我想使用kafkacat插入测试数据。
请注意,您可以将每个连接器的转换器设置为配置的一部分;在worker(即全局(级别设置StringConverter可能不是一个好主意,因为您很少使用它,当然是用于值。
有关Kafka Connect的更多信息,请查看:
- 用Kafka Connect从零到英雄
- Kafka Connect Deep Dive——转换器和序列化解释