我使用的是Confluent 3.3.0。我使用jdbc-source-connector
将消息从Oracle表插入到Kafka中。这很好用
我想看看是否可以"追加销售"。
我的意思是,如果我有一个学生表,它有3列id
(数字(、name
(varchar2(和last_modified
(时间戳(。每当我插入新行时,它都会被推送到Kafka(使用时间戳+自动递增字段(。但当我更新行时,Kafka中的相应消息应该会更新。
我的表的id
应该成为相应Kafka消息的key
。我的主键(id(将作为引用保持不变
每次更新行时,时间戳字段都会更新。
这可能吗?或者删除Kafka中的现有记录并插入新记录。
但是当我更新行时,Kafka中的相应消息应该更新
这是不可能的,因为Kafka在设计上只是附加的,并且是不可变的。
最好的方法是通过某个last_modified
列查询所有行,或者挂接一个CDC解决方案,如Oracle GoldenGate或alpha Debezium解决方案,该解决方案将捕获数据库上的单个UPDATE事件,并在Kafka主题上附加一个全新的记录。
如果你想在Kafka中对数据库记录进行重复数据消除(在一个时间窗口内找到最大last_modified
的消息(,你可以使用Kafka Streams或KSQL来执行这种类型的后处理过滤。
如果您使用压缩的Kafka主题,并插入您的数据库密钥作为Kafka消息密钥,那么在压缩之后,最新附加的消息将持续存在,并且具有相同密钥的前一条消息将被删除,而不是更新