在kafka-mysql源连接器中使用什么属性来注册任何模式更改的新版本



这是schema-registry.properties 的配置

listeners=http://10.X.X.76:8081
kafkastore.bootstrap.servers=PLAINTEXT://10.XXX:9092,PLAINTEXT://10.XXX:9092,PLAINTEXT://10.XXXX.1:9092,PLAINTEXT://1XXXX.69:9092
kafkastore.topic=_schemas
debug=false
master.eligibility=true

这是我的连接器的配置

{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"snapshot.locking.mode": "minimal",
"database.user": "cdc_user",
"tasks.max": "3",
"database.history.kafka.bootstrap.servers": "10.49.115.249:9092,10.48.130.211:9092,10.54.178.121:9092,10.53.4.69:9092",
"database.history.kafka.topic": "history.cdc.fkw.supply.mp.seller_facility",
"database.server.name": "cdc.fkw.supply.mp",
"heartbeat.interval.ms": "5000",
"database.port": "3306",
"table.whitelist": "seller_facility.addresses, seller_facility.location, seller_facility.default_location, seller_facility.location_document_mapping",
"database.hostname": "dog-rr.ffb-supply-ffb-supply-mp.prod.altair.fkcloud.in",
"database.password": "6X5DpJrVzI",
"database.history.kafka.recovery.poll.interval.ms": "5000",
"name": "cdc.fkw.supply.mp.seller_facility.connector",
"database.history.skip.unparseable.ddl": "true",
"errors.tolerance": "all",
"database.whitelist": "seller_facility",
"snapshot.mode": "when_needed"
}

当模式发生任何更改时,如何注册新的模式?我可以添加什么属性来做到这一点,这样它就可以为该特定主题向模式注册表添加一个新版本,并且是完全兼容的。

假设您的key/value.converter正在使用其中一个Confluent数据库列,例如AvroConverter,则Connect框架将自动拾取任何新的/已删除的数据库列,并将其注册到注册表中,作为KafkaAvroSerializer进程中序列化的一部分。

更改数据库列类型可能会产生错误,例如,将VARCHAR更改为INT

相关内容

  • 没有找到相关文章

最新更新