我是Kafka Streams的新手,我尝试创建一个poc,看看它是否适合我的用例。
我有一个主题,我正在制作一些参考数据。然后,这些数据被流式传输并转换为GlobalKTable CPK(我使用了GlobalKTable,因为我需要在非密钥上加入(。一旦完成此处理。然后,我开始填充另一个主题,然后流式传输(SPT(数据,并在CPK上进行内部联接,以生成另一个GlobalKTable(JTK(。
CPK和SPT都是来自外部系统的馈送。
现在我有了实时数据,我需要查找刚刚填充的参考数据。假设这个流被称为"真实"。Real随后与JTK进行了内部联合,我们实际上取得了很好的结果。
问题是当我需要从CPK中删除一行时。我传递了一个空值的键,希望它从CPK中删除这个值,并将更改传播到JTK。因此,任何带有该密钥的JTK记录都应该被删除。但这并没有发生。
这可行吗?我的想法正确吗?我应该使用KSQL吗?
提前感谢大家。
感谢@clicket_007的建议。我使用了KSQL,现在一切正常。所以基本上,我首先通过加入JTK创建了一个流,JTK现在是一个带有SPT的流(而不是以前的GKTable(。让我们将其命名为Joined Stream。之后,我又创建了一个流,名为resultby.joining Joined with CPK,这是一个表。(我在KSQL中没有找到GKTable概念(