是否可以使用 Kafka Connect 在 Kafka 中"upsert"消息?



我使用的是Confluent 3.3.0。我使用jdbc-source-connector将消息从Oracle表插入到Kafka中。这很好用
我想看看是否可以"追加销售"。

我的意思是,如果我有一个学生表,它有3列id(数字(、name(varchar2(和last_modified(时间戳(。每当我插入新行时,它都会被推送到Kafka(使用时间戳+自动递增字段(。但当我更新行时,Kafka中的相应消息应该会更新。

我的表的id应该成为相应Kafka消息的key。我的主键(id(将作为引用保持不变
每次更新行时,时间戳字段都会更新。

这可能吗?或者删除Kafka中的现有记录并插入新记录。

但是当我更新行时,Kafka中的相应消息应该更新

这是不可能的,因为Kafka在设计上只是附加的,并且是不可变的。

最好的方法是通过某个last_modified列查询所有行,或者挂接一个CDC解决方案,如Oracle GoldenGate或alpha Debezium解决方案,该解决方案将捕获数据库上的单个UPDATE事件,并在Kafka主题上附加一个全新的记录。

如果你想在Kafka中对数据库记录进行重复数据消除(在一个时间窗口内找到最大last_modified的消息(,你可以使用Kafka Streams或KSQL来执行这种类型的后处理过滤。

如果您使用压缩的Kafka主题,并插入您的数据库密钥作为Kafka消息密钥,那么在压缩之后,最新附加的消息将持续存在,并且具有相同密钥的前一条消息将被删除,而不是更新

相关内容

  • 没有找到相关文章

最新更新