我使用模式时间戳+递增的JDBC源连接器,使用Kafka Connect从Postgres获取表。数据的更新反映在Kafka主题中,但删除记录没有效果。所以,我的问题是:
- 有什么方法可以处理删除的记录吗
- 如何处理kafka主题中已删除但仍存在的记录
建议1(将源数据库调整为仅追加/更新,也可以通过布尔值或时间戳进行调整,当Kafka Connect查询表时,该时间戳会被过滤掉。
如果你的数据库空间不足,那么你可以删除旧记录,这些记录应该已经被Kafka 处理过了
选项2(使用CDC工具立即捕获删除事件,而不是在周期表扫描中丢失它们。Debezium是Postgres 的热门选项
卡夫卡主题可以被看作是一个"仅追加";日志它可以让所有的会议保持你喜欢的时间,但卡夫卡并不是为了删除主题中的个别信息而构建的。
在您描述的场景中,下游应用程序(使用主题(处理已删除记录上的信息是很常见的。
作为替代方案,您可以将主题的cleanup.policy
设置为compact
,这意味着它最终将只保留每个键的最新值。如果您现在将消息的键定义为Postgres表的主键,那么当您在主题中生成具有相同键和null
值的消息时,您的主题最终将删除该记录。然而,
-
我不确定你的连接器是否可以灵活地进行
-
根据您对kafka主题中的数据所做的处理,这可能仍然不能解决您的问题,因为下游应用程序仍将同时读取记录、原始记录和
null
消息作为已删除的记录。