Kafka MirrorMaker2 - 不镜像消费者组偏移量



我已经设置了MirrorMaker2,用于在2个DC之间复制数据。

我的mm2属性:

# mm2.properties
name=source->dest
clusters=source, dest
source.bootstrap.servers=localhost:9091
dest.bootstrap.servers=localhost:9092
source->dest.enabled=true
offset.storage.partitions=2
config.storage.replication.factor=1
status.storage.replication.factor=1

在MM2启动时看到以下内容:

[2020-02-16 07:31:07,547] INFO MirrorConnectorConfig values: 
admin.timeout.ms = 60000
checkpoints.topic.replication.factor = 3
config.action.reload = restart
config.properties.blacklist = [follower.replication.throttled.replicas, leader.replication.throttled.replicas, message.timestamp.difference.max.ms, message.timestamp.type, unclean.leader.election.enable, min.insync.replicas]
config.property.filter.class = class org.apache.kafka.connect.mirror.DefaultConfigPropertyFilter
connector.class = org.apache.kafka.connect.mirror.MirrorCheckpointConnector
consumer.poll.timeout.ms = 1000
emit.checkpoints.enabled = true
emit.checkpoints.interval.seconds = 60
emit.heartbeats.enabled = true
emit.heartbeats.interval.seconds = 1
enabled = true
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
group.filter.class = class org.apache.kafka.connect.mirror.DefaultGroupFilter
groups = [.*]
groups.blacklist = [console-consumer-.*, connect-.*, __.*]
header.converter = null
heartbeats.topic.replication.factor = 3
key.converter = null
metric.reporters = null
name = source->dest
offset-syncs.topic.replication.factor = 3
offset.lag.max = 100
refresh.groups.enabled = true
refresh.groups.interval.seconds = 600
refresh.topics.enabled = true
refresh.topics.interval.seconds = 600
replication.factor = 2
replication.policy.class = class org.apache.kafka.connect.mirror.DefaultReplicationPolicy
replication.policy.separator = .
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
source.cluster.alias = source
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
sync.topic.acls.enabled = true
sync.topic.acls.interval.seconds = 600
sync.topic.configs.enabled = true
sync.topic.configs.interval.seconds = 600
target.cluster.alias = dest
task.assigned.groups = null
task.assigned.partitions = null
tasks.max = 1
topic.filter.class = class org.apache.kafka.connect.mirror.DefaultTopicFilter
topics = [.*]
topics.blacklist = [.*[-.]internal, .*.replica, __.*]
transforms = []
value.converter = null
(org.apache.kafka.connect.mirror.MirrorConnectorConfig:347)

我的数据正在按预期复制。源主题在目标集群中创建为source.<TOPIC>。但是,不会复制使用者组偏移量。

已在源集群中启动消费组。

./kafka-console-consumer.sh --bootstrap-server localhost:9091 --topic test-1 --group test-1-group

消耗了很少的消息并停止了它。在本主题中发布了新消息,镜像制作者还将数据镜像到目标群集。

我尝试使用来自目标集群的消息,如下所示。

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic source.test-1 --group test-1-group

由于我使用相同的使用者组,因此我希望我的偏移量也会同步,并且不会使用我在集群1中使用的相同消息。但是,仍然使用所有消息。我在这里缺少什么吗?

复制偏移量不平凡有几个基本原因:

  1. Kafka是一个至少一次的系统(忽略炒作(。 这意味着镜像生成器,因为它建立在可以超时/断开连接的 Kafka 使用者和生产者之上,将导致某种程度的重复记录被传送到目标。 这意味着偏移量不会在源和目标之间映射 1:1。 即使您尝试使用"恰好一次"支持(MM2 KIP明确表示不使用(的所有内容可以跳过部分交付的批次,但这些批次仍将占用目的地的偏移量
  2. 如果在源主题
  3. 开始过期记录很久之后设置镜像,则目标主题将从偏移量 0 开始,而源主题的"最旧"偏移量要高得多。 有人试图解决这个问题(见KIP-391(,但从未被接受
  4. 通常,无法保证镜像拓扑从单个源镜像到单个目标。 例如,LinkedIn拓扑从多个源群集镜像到"聚合"层群集中。 映射偏移对于此类拓扑毫无意义

看看MM2 KIP有一个"偏移同步主题"。 在代码中,您可以使用类 RemoteClusterUtils 在集群之间转换检查点:

Map<TopicPartition, Long> newOffsets = RemoteClusterUtils.translateOffsets(
newClusterProperties, oldClusterName, consumerGroupId
);
consumer.seek(newOffsets);

这取自以下演示文稿 - https://www.slideshare.net/ConfluentInc/disaster-recovery-with-mirrormaker-20-ryanne-dolan-cloudera-kafka-summit-london-2019

或者,您可以使用 Timespamp API 在目标上启动使用者组,直到数据传送到目标的粗略时间(或传送到源,如果目标上的日志追加时间戳的代理设置不会覆盖这些时间(。 为了安全起见,您需要倒带一点。

Kafka 2.7引入了"自动消费者偏移同步"。 默认情况下,使用者偏移不会在集群之间同步。 应显式启用此功能。

在 MM 2.0 中支持跨集群的自动使用者偏移同步

我的数据正在按预期复制。源主题作为源在目标集群中创建。但是,不会复制使用者组偏移量。

默认情况下,MM2 不会从kafka-console-consumer复制使用者组。在启动时的MM2日志中,我们可以看到groups.blacklist = [console-consumer-.*, connect-.*, __.*]。我相信您可以在mm2.properties配置文件中覆盖它。

由于我使用相同的使用者组,因此我希望我的偏移量也会同步,并且不会使用我在 cluster1 中使用的相同消息。

正确镜像使用者组并启用检查点后,应该会在目标集群中自动创建一个内部主题(类似于dest.checkpoints.internal(。此检查点主题包含每个使用者组中镜像主题分区的源集群和目标集群中上次提交的偏移量。

然后,您可以使用 Kafka 的 RemoteClusterUtils 实用程序类来转换这些偏移量,并获取映射到使用者上次提交的偏移量test-1source.test-1的同步偏移量。如果您最终使用 Java 创建了一个使用者,则可以将RemoteClusterUtils作为依赖项添加到项目中:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-mirror-client</artifactId>
<version>2.4.0</version>
</dependency>

否则,您可能必须编写一个包装RemoteClusterUtils.java的工具才能获得转换后的偏移量。此功能或类似功能似乎计划作为 MM2 未来版本的一部分。

我看到您在检查点上的配置是

emit.checkpoints.enabled = true 
emit.checkpoints.interval.seconds = 60 

因此,您的检查点主题仅在 60 秒后反映新的更改。如果您立即尝试,它将不起作用,因此,请在 1 分钟后尝试。

最新更新