硬删除 Kafka-Connect 上的事件以同步数据库不起作用或给出错误



我已经将postgres数据库连接到MySql数据库上进行同步。

创建和更新事件在接收器上可以正常工作,但当我删除源上的行(而不仅仅是列中的数据(时,会出现错误。

我尝试过一些东西,但没有运气。

1-当我不把";createKey";以及";extractInt";在";变换";在我的MySql Sink上,我收到一个错误,并且该列没有使用bigserial创建。

";在没有密钥长度的密钥规范中使用的BLOB/TEXT列"id_consultor">

2-但是如果我把我的配置设置为";createKey";以及";extractInt";创建和删除工作很好,但在删除事件时出现此错误:

";只有在没有[将字段从值复制到键]的架构的情况下支持的Map对象,找到:null">

"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id_consultor",   
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "id_consultor"

3-如果我把我的源(Postgres(

**"transforms.unwrap.delete.handling.mode":"rewrite"**

删除部分地执行"删除"操作;软删除";不要擦除行,只需擦除所有数据并保留填充0的非空字段。

有人能帮我吗?谢谢

Postgres连接器:

"name": "postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "**",
"database.port": "5432",
"database.user": "**",
"database.password": "**",
"database.dbname" : "**",
"database.server.name": "kafkaPostgres",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "history",
"schema.include.list": "public",
"table.include.list": "public.consultor",
"time.precision.mode": "connect",
"tombstones.on.delete": "true",
"plugin.name": "pgoutput",
"transforms": "unwrap, dropPrefix",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.unwrap.add.fields": "table,lsn",
"transforms.unwrap.add.headers": "db",
"transforms.dropPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.dropPrefix.regex":"kafkaPostgres.public.(.*)",
"transforms.dropPrefix.replacement":"$1"

MySql接收器:

"name": "mysql-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "consultor",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"connection.url": "**,
"connection.user":"**",
"connection.password":"**",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"dialect.name": "MySqlDatabaseDialect",
"Database Dialect": "MySqlDatabaseDialect",
"table.name.format": "consultor",
"pk.mode": "record_key",
"pk.fields": "id_consultor",
"delete.enabled": "true",
"drop.invalid.message": "true",
"delete.retention.ms": 1,
"fields.whitelist": "id_consultor, idempresaorganizacional, cd_consultor_cpf, dt_consultor_nascimento , ds_justificativa, nn_consultor , cd_consultor_rg, id_motivo, id_situacao , id_sub_motivo",
"transforms": "unwrap, flatten, route, createKey, extractInt ",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.delete.handling.mode":"rewrite",
"transforms.flatten.type": "org.apache.kafka.connect.transforms.Flatten$Value",
"transforms.flatten.delimiter": ".",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "(?:[^.]+)\.(?:[^.]+)\.([^.]+)",
"transforms.route.replacement": "$1",
"transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields":"id_consultor",   
"transforms.extractInt.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractInt.field": "id_consultor"

我在连接器上添加了以下属性:

"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url" :"http://apicurio:8080/api",
"key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url":"http://apicurio:8080/api",
"value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",

并在接收器上替换此:

"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",

至:

"key.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"key.converter.apicurio.registry.url" :"http://apicurio:8080/api",
"key.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"value.converter": "io.apicurio.registry.utils.converter.AvroConverter",
"value.converter.apicurio.registry.url":"http://apicurio:8080/api",
"value.converter.apicurio.registry.global-id": "io.apicurio.registry.utils.serde.strategy.GetOrCreateIdStrategy",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true",

一切都很好,但我无法阅读主题中的月经,因为我使用的是debezium-kafka版本,并且没有avro控制台阅读器。

现在我正在尝试这个版本的一些插件来读取avro文件。

我希望有人能帮忙。

相关内容

  • 没有找到相关文章

最新更新