我配置了一个Kafka JDBC Source连接器,以便在Kafka主题上推送PostgreSQL数据库中更改(插入或更新(的记录。我使用";时间戳+递增";模式看起来效果不错。我没有配置JDBC接收器连接器,因为我正在使用一个Kafka Consumer来监听这个主题。
主题中的消息是一个JSON。这是一个例子:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "int64",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "entity_create_date"
},
{
"type": "int64",
"optional": true,
"name": "org.apache.kafka.connect.data.Timestamp",
"version": 1,
"field": "entity_modify_date"
},
{
"type": "int32",
"optional": true,
"field": "entity_version"
},
{
"type": "string",
"optional": true,
"field": "firstname"
},
{
"type": "string",
"optional": true,
"field": "lastname"
}
],
"optional": false,
"name": "author"
},
"payload": {
"id": 1,
"entity_create_date": 1600287236682,
"entity_modify_date": 1600287236682,
"entity_version": 1,
"firstname": "George",
"lastname": "Orwell"
}
}
正如您所看到的,没有关于源连接器是否因为插入或更新而捕获此更改的信息。我需要这些信息。如何解决?
除非在源模式和触发器中进行定制,否则无法使用JDBC源连接器获取这些信息。
这就是为什么基于日志的CDC通常是从源数据库获取事件的更好方法的原因之一,其他原因包括:
- 捕获删除
- 捕获操作类型
- 捕获所有事件,而不仅仅是连接器轮询时的事件
有关这方面细微差别的更多细节,请参阅本博客或基于此的演讲。
使用@Robin Moffatt建议的基于CDC的方法可能是处理您的需求的正确方法。结账https://debezium.io/
然而,查看您的表数据,您可以使用";entity_create_date";以及";entity_modify_date";在您的使用者中,以确定消息是否在插入或更新中。如果";entity_create_date"="entity_modify_date";然后是插入,否则是更新。