如何在第二个主题数据延迟到达时合并两个kafka主题数据



从mysql数据库中获得两个不同表的两个kafka主题。

表1-交易数据

表2-交易明细数据

现在,我需要合并这两个kafka主题(又名mysql表(的数据,并将其作为一个文档推送到Mongo-Db。

虽然我可以使用kafka流来做同样的事情,但需要如何处理以下情况的建议

情况1-当表1数据到达但不是表2数据时

情况2-当表2数据到达但不是表1数据时

将数据临时存储在Windowed键值存储中。

当数据从流1到达时:查看流2的匹配数据是否可用。如果是,请组合数据并存储在MongoDB中。如果不是,则将流1的数据存储在窗口存储中。

数据从流2到达时:查看流1的匹配数据是否可用。如果是这样,请组合数据并将其存储在MongoDB中。如果没有,则将流2的数据存储在窗口存储中。

KafkaStreams中窗口存储的默认实现是每个分区一个RocksDB实例。要做到这一点,您必须确保两个流具有相同的分区。

这正是卡夫卡流在KStream背后所做的。加入(KStream,…(:

KStream<String, String> joined = left.join(right,
(leftValue, rightValue) -> combine(leftValue, rightValue),
JoinWindows.of(...),
Joined.with(...)
);

连接窗口的大小通常是有限的,以避免数据保持无限长。限制应该是来自不同流的数据的不同到达时间之间的最大差异。

最新更新