在Confluent Cloud中使用Debezium和KSQLDB时,如何处理tombstone和Struct类型键



我已经使用KSQLDB创建了一个Debezium Kafka连接器。

每次删除表中的一行时,Debezium都会发送一个如下的tombstone(f.ex(:

KEY: Struct(cliente_cod=0000) | BODY: null

当我在表中具体化一行(使用KSQLDB(时,我有以下列(f.ex(:

ID: 0000 | NAME: xxxx | SURNAME: xxxx

如果不进行任何转换,逻辑删除Struct(cliente_cod=0000)中的id和表0000中的id将不匹配,因此不会删除该行。显然,我们可以将Struct(cliente...)存储为表的id,但如果需要与其他表进行联接,则可能会出现问题。

通过流重新密钥(使用PARTITION BY重新密钥,f.ex(,tombstone将被忽略,因为null不是有效的内容(流对tombstone一无所知;它只是物化视图的一个概念(。

一个好的解决方案可能是添加转换(这里是上一个例子,在KSQLDB连接器定义中(:

"transforms.extractClienteKey.type" = 'org.apache.kafka.connect.transforms.ExtractField$Key',
"transforms.extractClienteKey.field" = 'cliente_cod',
"transforms.extractClienteKey.predicate" = 'IsClienteTopic',

这很好,而且有效;tombstone将转换为(NoStruct(:

KEY: 0000 | BODY: null

当您的数据库中有许多具有不同主键名称的表时;假设您有30个表的PK名称,如client_iduser_id等。在这种情况下,为了使用ExtractField$Key,您需要按主题进行区分,并对每个主题应用不同的转换。

这也有效,问题是当你试图通过Confluent Cloud中的连接器运行10个以上的转换时(该服务限制为10个(。

我的问题是:

  1. 有没有一种方法可以配置Debezium(或任何kafka-connect(连接器,在不应用转换的情况下发送0000而不是Struct(id=0000)
  2. 处理Debezium tombstone和KSQLDB表的正确方法是什么?转型是唯一的办法吗?有其他选择吗

经过一些研究,目前似乎没有任何可行的变压器替代品可以在连接器级别做到这一点。如果你在Confluent Cloud中遇到同样的问题,你能做的最好的事情可能是与Confluent支持团队联系,以尝试增加转换限制。

最新更新