MirrorMaker2 - 使用独立连接器自定义重命名主题



我想将MirrorMaker作为独立连接器运行。 到目前为止,我还没有找到有关配置的任何文档。 据我所知,以下内容将复制myTopic.

现在,在目标集群中,我需要主题具有另一个名称foo(而不是自动重命名(。 这是MirrorSourceConnector直接支持的,还是我需要其他方法?

connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
tasksMax = 2
topics = myTopic
source.cluster.bootstrap.servers = sourceHost:9092
target.cluster.bootstrap.servers = sinkHost:9092

所以Kafka Mirror Maker源代码有一个不错的 readme.md。

配置方式因直接运行 MM2 还是在 Kafka Connect 中运行而异。 你直接说,这是在链接 readme.md。

基本上:

默认情况下,复制的主题根据"源集群"重命名 别名":

主题-1 -->源.主题-1

这可以通过覆盖 replication.policy.separator 来自定义 属性(默认值为句点(。如果您需要更好地控制如何 远程主题已定义,可以实现自定义 ReplicationPolicy 和 override replication.policy.class(默认值为 默认复制策略(。

不幸的是,这意味着您无法仅通过配置代码重命名主题。 (类只允许您指定分隔符,而不允许指定其他任何分隔符(。 这可能是因为当您指定要镜像的主题时,您使用的是正则表达式,而不是单个主题名称(即使您的源集群主题配置属性只是主题的名称 - 它仍然被视为正则表达式(。

所以,回到文档:ReplicationPolicy是 Kafka connect 源代码中的 Java 接口,因此您需要实现一个实现ReplicationPolicy的自定义 Java 类,然后在运行 MM2 时确保它位于类路径上。

假设你确实写了这样一个类,你称之为com.moffatt.kafka.connect.mirror.FooReplicationPolicy。 您的类的一个很好的模板是 Kafka Connect:DefaultReplicationPolicy附带的默认(显然是唯一(复制策略类。 您可以看到,构建自己的不会太困难。 您可以轻松添加映射(硬编码或已配置(,用于查找特定配置的主题名称并将其映射到目标主题名称。

您可以通过在配置中将其指定为以下方式来使用新类:replication.policy.class = com.moffatt.kafka.connect.mirror.FooReplicationPolicy

最新更新