没有重复记录的卡夫卡批处理使用者



>我有以下要求,我们正在从具有CDC在特定表上插入/更新的关系数据库中读取,并将其作为事件导入到Kafka主题中。

例如 JDBC-源-主题

|---------------------|------------------|------------------|
|      Timestamp      |        ID        |      Column      |
|---------------------|------------------|------------------|
|        10:00        |         1        |         A        |
|---------------------|------------------|------------------|
|        10:01        |         2        |         B        |
|---------------------|------------------|------------------|
|        10:01        |         1        |         C        |
|---------------------|------------------|------------------|

在管道结束时,我们希望每天使用一次这些事件,并避免同一 ID 出现重复。

例如目标主题

|---------------------|------------------|------------------|
|      Timestamp      |        ID        |      Column      |
|---------------------|------------------|------------------|
|        10:01        |         2        |         B        |
|---------------------|------------------|------------------|
|        10:01        |         1        |         C        |
|---------------------|------------------|------------------|

在我看来,最好的解决方案是让一个消费者运行一次group_id(这样偏移量就存储在 kafka 中以供第二天使用(。但这意味着每次使用者运行时,它都必须从获取的记录中删除重复项。

考虑到此表将来也可能用于 KSQL Joins,我想知道是否存在使用 KSQL 查询的更好方法,以便消费者从清理的主题中获取每个键一条记录。

如果此数据的唯一使用者是 ksqlDB,那么您可能不需要取消重复数据,因为如果您在 ksql 中将主题作为 TABLE 导入,ksqlDB 将正确处理对同一键的多个更新,即而不是执行以下操作:

CREATE STREAM FOO (... columns ...) WITH (...);

做:

CREATE TABLE FOO (... columns ...) WITH (...);

目前,当 ksqlDB 处理此类更改日志时,它将输出所有/部分重复项,具体取决于您如何配置cache.max.bytes.buffering

您可以通过使用 24 小时窗口和即将推出的抑制支持来避免发出重复项。 在此之前,如果您想按照建议删除重复项。您还可以通过编写自己的 Kafka Streams 应用程序将表具体化为状态存储并使用抑制 API 删除重复项来获得某些工作。

但是,值得指出的是,从语义上讲,重复项不会引起任何问题。将更改日志具体化为表的结果是 使用和不使用重复项进行更正。 因此,正如我在开始时所说,删除重复项甚至可能没有必要。

最新更新