当每个主题中的更新都不相同时,使用KAFKA流中的4个主题加入4个主题的数据



我正在努力处理从SQL数据存储中摄入的数据到Kafka Broker的需求,其中4个不同的主题与SQL数据存储中的4个不同表相对应。我正在使用Kafka Connect将数据摄入主题。

我现在想加入这些主题中的数据并将其汇总并将其写回另一个主题。该主题又将由消费者订阅,以填充NOSQL数据存储,该数据存储将用于渲染UI。

我知道可以使用Kafka流来加入主题。

我的查询是,从SQL数据存储表中获取的数据可能并不总是所有4个表的数据。只有2个表可以定期更新。一个将会更新,但与另一个2。其余的是静态的(一种主表)。

所以,我不确定当记录计数时,我们如何实际加入Kafka流将与主题不匹配时。

有人面临类似的问题。如果是这样,您可以在同一中提供您的想法/代码片段。

行的数量根本无关紧要...为什么它会对加入结果产生任何影响?

您可以将所有4个主题读取为KTable,然后进行加入。最后,您将汇总应用于联接分子KTable,并将最终结果写入主题。这样的东西:

KTable t1 = builder.table("topic1");
KTable t2 = builder.table("topic2");
KTable t3 = builder.table("topic3");
KTable t4 = builder.table("topic4");
KTable joinResult = t1.join(t2, ...).join(t3, ...).join(t4, ...);
joinResult.groupByKey(...).aggregate(...).to("result-topic);

最新更新