我在Mysql表上使用Debezium来捕获到Kafka的更改日志,Kafka连接配置如下:
{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "xxxx",
"database.password": "xxxx",
"database.server.id": "42",
"database.server.name": "xxxx",
"table.whitelist": "demo.movies",
"database.history.kafka.bootstrap.servers": "broker:9092",
"database.history.kafka.topic": "dbhistory.demo" ,
"decimal.handling.mode": "double",
"include.schema.changes": "true",
"transforms": "unwrap,dropTopicPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropTopicPrefix.regex":"asgard.demo.(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081"
}
然而,它正在将表中的所有旧记录发送到卡夫卡主题。
有没有办法只读取新的变更日志数据
默认行为是快照表(获取所有现有数据(,然后读取新数据。
要只读取新数据,您需要添加"snapshot.mode" : "schema_only"