对intervalJoin感到困惑



我正试图想出一个解决方案,该解决方案涉及在联接操作之后应用一些逻辑,以便在多个EventBs中从streamB中选择一个事件。这就像一个reduce函数,但它只返回1个元素,而不是增量执行。因此,最终结果将是单个(EventAEventB(对,而不是1个EventA和多个EventB的叉积。

streamA
.keyBy((a: EventA) => a.common_key)
.intervalJoin(
streamB
.keyBy((b: EventB) => b.common_key)
)
.between(Time.days(-30), Time.days(0))
.process(new MyJoinFunction)

数据将被摄取如下(假设它们有相同的密钥(:

EventB ts: 1616686386000

EventB ts: 1616686387000

EventB ts: 1616686388000

EventB ts: 1616686389000

EventA ts: 1616686390000

每个EventA密钥保证只到达一次。

假设像上面这样的联接操作,它生成了1个EventA和4个EventB,成功地联接并收集在MyJoinFunction中。现在,我想做的是,立即访问这些值,并执行一些逻辑,使EventA正好匹配一个EventB。例如,对于上述数据集,我需要(EventA1616686390000EventB1616686387000(。

将为每个(EventAEventB(对调用MyJoinFunction,但我希望在此之后执行一个操作,该操作允许我访问迭代器,以便我可以查看每个EventA的所有EventB事件。

我知道我可以在连接后应用另一个窗口操作来对所有对进行分组,但我希望在连接成功后立即进行。因此,如果可能的话,我希望避免添加另一个窗口,因为我的窗口已经很大了(30天(。

Flink是这个用例的正确选择,还是我完全错了?

这可以作为KeyedCoProcessFunction来实现。您可以通过它们的公共密钥为两个流设置密钥,将它们连接起来,然后一起处理这两个流。

您可以使用ListState来存储来自B的事件(对于给定的密钥(,使用ValueState来存储a的事件(同样,对于给定密钥(。您可以使用事件时间计时器来了解查看ListState中的B事件并生成结果的时间。完成后不要忘记清除状态。

如果您不熟悉Flink API的这一部分,那么流程函数教程应该会有所帮助。

相关内容

  • 没有找到相关文章