文档在kafka流中没有被删除



我正在尝试同步我的PostgreSQL数据库到elasticsearch。但遇到一些困难的删除记录。

这里有一些关于我想要达到的目标的信息。

  1. 通过kafka connect (debezium postgres连接器)获取所有表到kafka主题。每个表都有自己的主题。
  2. 对它们进行流处理。像阅读kafka主题作为ktable,使用SpecificAvroSerde,并加入他们得到一个完整的文档与嵌入的信息。
  3. 将join的结果写入输出主题

这是我的源配置:

{
"name": "pg-source-1",
"config": {
"slot.name" : "debezium",
"database.server.name": "cdc",
"slot.drop_on_stop": true,
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "postgres",
"schema.whitelist": "my_schema",
"override.message.max.bytes": "524288000",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "false",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"database.history.kafka.topic": "schema-changes.my_schema
}
}

现在我有3个表格,产品(PK id),product_category(只有FKs,一个引用产品。id和其他到category.id),category(PK id)这里的product_category是一个桥接表。当我删除产品和类别之间的关系时例如product_category中的记录表格它没有反映在ES端。

这些都是有效的方法:

  1. 产品更新产品名称表。
  2. product_category创建新条目表。

这是ES接收器连接器配置:

{
"name": "es-sink-1",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "output-topic",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.ignore": false,
"transforms": "extract",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "id",
"transforms.extract.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extract.field": "id",
"behavior.on.null.values" : "delete"
}
}

这是Streams应用程序的配置。

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application-1");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
props.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 500);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

我点击了这个链接。
如果需要更多的信息,请告诉我。谢谢:)

在查看了debezium的文档后,我能够修复这个问题。

对于一个消费者能够处理为一个没有主键的表生成的删除事件,将表的复制id设置为FULL。当一个表没有主键,并且表的复制id被设置为DEFAULT或NOTHING时,delete事件没有before字段。

因此,我只需要将桥接表的副本标识更改为FULL,因为它没有PKs,只有fk。

我在上面的配置中还做了一个配置更改:

"transforms.unwrap.drop.tombstones":false,

在此之后,我开始接收带有键(用于删除记录)和空值的事件,只是必须在流应用程序中遵守这一点。

相关内容

  • 没有找到相关文章

最新更新