我想添加一个新的Debezium连接器。假设我想知道DB字段Entity.status何时更改为";取消";。我能做的就是像这样使用groovy:
"transforms.filter.language": "jsr223.groovy"
然后我可以添加这种filer条件:
"transforms.filter.condition": "value.before.status != value.after.status && value.after.status == 'canceled'"
另一种方法是将每个更改的记录发送到我的Kafka处理程序,并在那里进行过滤:
public void onMessage(EntityChangedMessage entityChangedMessage) {
if(entityChangedMessage.getAfter().getStatus() == entityChangedMessage.getBefore().getStatus() ||
entityChangedMessage.getAfter().getStatus() != "canceled") {
return;
}
// my logic for when I meet the condition I care for...
}
第二种方法是让我(或任何其他关心它的开发人员(使用事件用于其他流和目的,即使是与状态字段无关的流和目的。另一方面,第二种方法有点混乱。我很乐意听取关于哪种方法更好的意见。
TL.DR CDC环境中的常见模式是发布所有消息,然后在目的地进行筛选。
详细信息:
正如你已经提到的,这是基于意见的。这里有一件事你可能想考虑
CDC和变更捕获被广泛用于记录系统的审计目的。这实现了记录生命周期中状态的可追溯性和跟踪变化(可以是任何事情,例如创建一个新帐户,其状态从新的、挂起的、批准的和取消的(
在大多数这样的场景中,您需要所有这些状态更改。可能存在可能只想要特定消息的应用程序的子集,例如;取消";帐户,在这种情况下,他们可能会过滤掉其余的消息。
这样做的缺点是什么,假设我们平均有10万个新帐户,而只有100个取消,这意味着绝大多数消息都需要丢弃。可能存在无法扩展的应用程序(例如,可能是不是为这么大的容量而构建的旧应用程序(。同样,有一些方法可以通过使用可扩展的路由器来解决这个问题,该路由器将取消的消息路由到可以消费的特定主题。