我从最初的Kafka MirrorMaker迁移到MirrorMaker 2.0,用于将主题从一个集群复制到另一个集群。我正在运行一个专用的MirrorMaker集群,如文档中所述。
假设我正在复制一个名为test-topic
的主题。
Cluster A Cluster B
---------- ----------
test-topic ---> A.test-topic
如何确定A.test-topic
落后test-topic
多远
最初的MirrorMaker创建了使用者组,所以我提到了该使用者组的滞后。MirrorMaker 2.0没有创建使用者组,所以我不能用它来确定滞后。
我也有同样的用例。在MM2中,他们使用consumer.assign((而不是consumer.subscribe((来使用源集群
由于assign不需要任何groupId,因此我们无法使用使用者组来获取滞后。
作为一种变通方法,我做了以下事情:
- 调度程序将定期运行,例如每15分钟运行一次
- 它将获取源集群主题和目标集群主题的日志结束偏移量
- 我们可以比较这两者,并根据滞后,我们可以配置警报
为了找到对数端偏移量,我们可以为主题创建一个消费者,并可以使用consumer.seakToEnd来获取该位置。此外,您需要确保在这个流程中必须跳过内部主题。
从KIP-382,有一个度量来查找复制的记录数。但这不是计算滞后的直接方法。https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0#KIP382:MirrorMaker2.0-公共接口
您不能直接获得滞后。MM2在目标集群{MM2 offets}中创建内部主题。source-cluster-name.internal。MM2在此处提交偏移。消息密钥为{topic}-{partition}和值是偏移的。一种方法是检查源主题的日志结束偏移量,并将其与内部主题中为该分区提交的偏移量进行比较。