Debezium Postgres和ElasticSearch-在ElasticSearch中存储复杂对象



我在Postgres中有一个带有表的数据库"产品";它与1到n相连;sales_Channel";。因此,一个产品可以有多个销售渠道。现在我想把它转移到ES并保持最新,所以我使用debezium和kafka。将单个表转移到ES是没有问题的。我可以查询SalesChannels和Products。但我需要附带所有销售渠道的产品作为结果。我怎样才能把这个转移出去?

产品映射

{
"settings": {
"number_of_shards": 1
},
"mappings": {
"_doc": {
"properties": {
"id": {
"type": "integer"
}
}
}
}
}

产品水槽

{
"name": "es-sink-product",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "product",
"connection.url": "http://elasticsearch:9200",
"transforms": "unwrap,key",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"transforms.unwrap.drop.tombstones": "false",
"transforms.unwrap.drop.deletes": "false",
"transforms.key.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.key.field": "id",
"key.ignore": "false",
"type.name": "_doc",
"behavior.on.null.values": "delete"
}
}

您需要使用Outbox模式,请参阅https://debezium.io/documentation/reference/1.2/configuration/outbox-event-router.html

或者可以使用聚合对象,请参见https://github.com/debezium/debezium-examples/tree/master/jpa-aggregationshttps://github.com/debezium/debezium-examples/tree/master/kstreams-fk-join

最新更新