使用Kafka 2.7.0,我使用MirroMaker 2.0作为Kafka连接连接器,将所有主题从主Kafka集群复制到备份集群。
除了__consumer_offsets
之外,所有的主题都被完美地复制了。以下是连接配置:
{
"name": "test-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
"topics.blacklist": "some-random-topic",
"replication.policy.separator": "",
"source.cluster.alias": "",
"target.cluster.alias": "",
"exclude.internal.topics":"false",
"tasks.max": "10",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
"target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
"topics": "test-topic-from-primary,primary-kafka-connect-offset,primary-kafka-connect-config,primary-kafka-connect-status,__consumer_offsets"
}
}
在这里的一个类似问题中,公认的答案如下:
将其添加到您的consumer.config:中
exclude.internal.topics=false
并将其添加到您的生产商中。config:
客户端.id=__admin_client
在配置中的哪些位置添加这些?
这里连接器配置属性没有名为client.id
的属性,不过我已经将exclude.internal.topics
的值设置为false
。
我这里缺了什么吗?
更新
我了解到Kafka 2.7及更高版本支持使用MirrorCheckpointTask
的自动消费者偏移同步,如上所述。
我已经为此创建了一个连接器,具有以下配置:
{
"name": "mirror-checkpoint-connector",
"config": {
"connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"sync.group.offsets.enabled": "true",
"source.cluster.alias": "",
"target.cluster.alias": "",
"exclude.internal.topics":"false",
"tasks.max": "10",
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
"source.cluster.bootstrap.servers": "xx.xx.xxx.xx:9094",
"target.cluster.bootstrap.servers": "yy.yy.yyy.yy:9094",
"topics": "__consumer_offsets"
}
}
还是无济于事。这是正确的方法吗?需要什么吗?
您不希望复制connsumer_offset。由于各种原因,从src到目标集群的偏移量将不相同。
MirrorMaker2提供了进行偏移转换的功能。它将使用src集群生成的转换偏移量填充目标集群。https://cwiki.apache.org/confluence/display/KAFKA/KIP-545%3A+支持+自动化+消费者+偏移+同步+跨+集群+在+MM+2.0
__consumer_offset默认情况下被忽略
topics.exclude = [.*[-.]internal, .*.replica, __.*]
你需要覆盖这个配置