我在分布式模式下设置了kafka-connect集群,我想获得与多个kafka-CLUSTERS的连接,而不仅仅是多个代理。可以在connect-distributed.properties
中使用bootstrap.servers
设置目标代理。所以,一开始,我从kafka-cluster-A设置broker1,如下所示:
bootstrap.servers=broker1:9092
当然,效果很好。
然后,我添加了来自kafka-cluster-B的broker2,如下所示:
bootstrap.servers=broker1:9092,broker2:9092
因此,这两个经纪人属于不同的集群。但这根本不起作用。在没有任何错误的情况下,它只是被卡住了,并且对于通过RESTneneneba API创建连接器之类的请求没有任何答案。
如何连接多个kafka集群?
据我所知,您只能将一个KafkaConnect工作进程连接到一个Kavka集群。
如果您有不同集群上的数据要用Kafka Connect处理,那么运行多个Kafka Connection工作进程。
如果您在源集群(而不是目标集群(上运行Connect,则需要以下内容:
- 在connect-distributed-worker.properties中将
bootstrap.servers
设置为源服务器 - 通过Connect REST API启动实际镜像:
curl --noproxy "*" -XPOST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors -d'{
"name": "MirrorSourceConnector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"replication.factor": "1",
"source.cluster.alias": "site1",
"target.cluster.alias": "cloud",
"source.cluster.bootstrap.servers": "localhost:29092",
"target.cluster.bootstrap.servers": "localhost:49092",
"topics": ".*",
"emit.checkpoints.interval.seconds": "1",
"emit.heartbeats.interval.seconds": "1",
"sync.topic.acls.enabled": "false",
"refresh.topics.interval.seconds": "4",
"producer.override.bootstrap.servers":"localhost:49092"
}
}'
到目前为止,最重要的是:
";生产者.覆盖.引导.服务器":"本地主机:49092">
这实际上导致源->目标复制工作。我不知道为什么"target.cluster.bootstrap.servers": "localhost:49092"
不能正常工作。但这个解决方案是有效的。如果没有这个片段,复制将只复制源集群上带有"的主题;站点1";前缀,并且不对目标执行任何操作。
OSS文档中没有记录这一点,我在这篇博客文章以及这个Confluent页面上都提到了这一点;在源集群上运行复制程序";
(此解决方案在Apache Kafka Connect 3.2.0和/或cp-kafka:7.2.1
Docker镜像上进行了测试(