在Kafka中,如何处理源表中已经反映在Kafk主题中的已删除行



我使用模式时间戳+递增的JDBC源连接器,使用Kafka Connect从Postgres获取表。数据的更新反映在Kafka主题中,但删除记录没有效果。所以,我的问题是:

  1. 有什么方法可以处理删除的记录吗
  2. 如何处理kafka主题中已删除但仍存在的记录

建议1(将源数据库调整为仅追加/更新,也可以通过布尔值或时间戳进行调整,当Kafka Connect查询表时,该时间戳会被过滤掉。

如果你的数据库空间不足,那么你可以删除旧记录,这些记录应该已经被Kafka 处理过了

选项2(使用CDC工具立即捕获删除事件,而不是在周期表扫描中丢失它们。Debezium是Postgres 的热门选项

卡夫卡主题可以被看作是一个"仅追加";日志它可以让所有的会议保持你喜欢的时间,但卡夫卡并不是为了删除主题中的个别信息而构建的。

在您描述的场景中,下游应用程序(使用主题(处理已删除记录上的信息是很常见的。

作为替代方案,您可以将主题的cleanup.policy设置为compact,这意味着它最终将只保留每个键的最新值。如果您现在将消息的键定义为Postgres表的主键,那么当您在主题中生成具有相同键和null值的消息时,您的主题最终将删除该记录。然而,

  1. 我不确定你的连接器是否可以灵活地进行

  2. 根据您对kafka主题中的数据所做的处理,这可能仍然不能解决您的问题,因为下游应用程序仍将同时读取记录、原始记录和null消息作为已删除的记录。

相关内容

  • 没有找到相关文章

最新更新