我有以下情况,我有一个Apache Kafka主题包含许多记录类型。
例如:
- UserCreated
- UserUpdated
- UserDeleted
- AnotherRecordType
- …
我希望在列出的三个User*
记录类型上实现CDC,以便在最后,我有一个包含所有用户信息的最新KTable。
如何在ksqlDB中做到这一点?因为,据我所知,Debezium和其他CDC连接器也从单个主题获取数据,我至少知道这应该是可能的。
我已经通读Confluent文档有一段时间了,但我似乎找不到任何与我的用例(CDC使用现有主题)相关的东西。如果有什么我忽略了,我将非常感谢相关文档的链接以及。
我假设,至少记录必须具有相同的键,以便sqldb能够匹配它们。所以我的问题归结为:
- 我该如何教sqldb这是一个插入,更新和删除?
- 键匹配是硬要求,还是我们可以使用其他连接/匹配谓词?
我能想到的一种可能性基本上是CDC已经做到的:将每个传入的记录视为一个新条目,这样我就可以在KTable中拥有一个缓慢变化的维度,对键进行分组并选择带有例如最新时间戳的条目。
那么,就像下面这样:
CREATE TABLE users AS
SELECT user.user_id,
latest_by_offset(user.name) AS name,
latest_by_offset(user.email),
CASE WHEN record.key = UserDeleted THEN true ELSE FALSE END,
user.timestamp,
...
FROM users
GROUP BY user.user_id
EMIT CHANGES;
可能(例如用ROWKEY
代替record.key
)?如果不是,那么Debezium是如何做到的呢?
一般模式是没有不同的模式类型;只有User
。然后,插入任何唯一键(例如userid)的第一个记录。之后,同一键的任何非空值都被更新(通常要求所有字段都是值的一部分,有效地执行"替换";表中的操作)。删除是由为键发送空值(tombstone事件)引起的。
如果您有多个模式,最好创建一个新的流,该流将删除任何删除事件,将创建和更新统一到您想要的信息的公共模式,并过滤您想要忽略的事件类型。
例如Debezium是如何做到的?
对于使用来自Debezium主题的数据,您可以使用转换来"提取新的记录状态"。它不会为您创建任何表。