在Kafka Connect中,如何连接多个Kafka集群



我在分布式模式下设置了kafka-connect集群,我想获得与多个kafka-CLUSTERS的连接,而不仅仅是多个代理。可以在connect-distributed.properties中使用bootstrap.servers设置目标代理。所以,一开始,我从kafka-cluster-A设置broker1,如下所示:

bootstrap.servers=broker1:9092

当然,效果很好。

然后,我添加了来自kafka-cluster-B的broker2,如下所示:

bootstrap.servers=broker1:9092,broker2:9092

因此,这两个经纪人属于不同的集群。但这根本不起作用。在没有任何错误的情况下,它只是被卡住了,并且对于通过RESTneneneba API创建连接器之类的请求没有任何答案。

如何连接多个kafka集群?

据我所知,您只能将一个KafkaConnect工作进程连接到一个Kavka集群。

如果您有不同集群上的数据要用Kafka Connect处理,那么运行多个Kafka Connection工作进程。

如果您在源集群(而不是目标集群(上运行Connect,则需要以下内容:

  1. 在connect-distributed-worker.properties中将bootstrap.servers设置为源服务器
  2. 通过Connect REST API启动实际镜像:
curl --noproxy "*" -XPOST -H 'Content-Type: application/json' -H 'Accept: application/json' http://localhost:8083/connectors -d'{
"name": "MirrorSourceConnector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"replication.factor": "1",
"source.cluster.alias": "site1",
"target.cluster.alias": "cloud",
"source.cluster.bootstrap.servers": "localhost:29092",
"target.cluster.bootstrap.servers": "localhost:49092",
"topics": ".*",
"emit.checkpoints.interval.seconds": "1",
"emit.heartbeats.interval.seconds": "1",
"sync.topic.acls.enabled": "false",
"refresh.topics.interval.seconds": "4",
"producer.override.bootstrap.servers":"localhost:49092"
}
}'

到目前为止,最重要的是:

";生产者.覆盖.引导.服务器":"本地主机:49092">

这实际上导致源->目标复制工作。我不知道为什么"target.cluster.bootstrap.servers": "localhost:49092"不能正常工作。但这个解决方案是有效的。如果没有这个片段,复制将只复制源集群上带有"的主题;站点1";前缀,并且不对目标执行任何操作。

OSS文档中没有记录这一点,我在这篇博客文章以及这个Confluent页面上都提到了这一点;在源集群上运行复制程序";

(此解决方案在Apache Kafka Connect 3.2.0和/或cp-kafka:7.2.1Docker镜像上进行了测试(

相关内容

  • 没有找到相关文章