我正试图想出一个解决方案,该解决方案涉及在联接操作之后应用一些逻辑,以便在多个EventB
s中从streamB
中选择一个事件。这就像一个reduce函数,但它只返回1个元素,而不是增量执行。因此,最终结果将是单个(EventA
,EventB
(对,而不是1个EventA
和多个EventB
的叉积。
streamA
.keyBy((a: EventA) => a.common_key)
.intervalJoin(
streamB
.keyBy((b: EventB) => b.common_key)
)
.between(Time.days(-30), Time.days(0))
.process(new MyJoinFunction)
数据将被摄取如下(假设它们有相同的密钥(:
EventB ts: 1616686386000
EventB ts: 1616686387000
EventB ts: 1616686388000
EventB ts: 1616686389000
EventA ts: 1616686390000
每个EventA
密钥保证只到达一次。
假设像上面这样的联接操作,它生成了1个EventA
和4个EventB
,成功地联接并收集在MyJoinFunction
中。现在,我想做的是,立即访问这些值,并执行一些逻辑,使EventA
与正好匹配一个EventB
。例如,对于上述数据集,我需要(EventA
1616686390000
,EventB
1616686387000
(。
将为每个(EventA
,EventB
(对调用MyJoinFunction
,但我希望在此之后执行一个操作,该操作允许我访问迭代器,以便我可以查看每个EventA
的所有EventB
事件。
我知道我可以在连接后应用另一个窗口操作来对所有对进行分组,但我希望在连接成功后立即进行。因此,如果可能的话,我希望避免添加另一个窗口,因为我的窗口已经很大了(30天(。
Flink是这个用例的正确选择,还是我完全错了?
这可以作为KeyedCoProcessFunction
来实现。您可以通过它们的公共密钥为两个流设置密钥,将它们连接起来,然后一起处理这两个流。
您可以使用ListState
来存储来自B的事件(对于给定的密钥(,使用ValueState
来存储a的事件(同样,对于给定密钥(。您可以使用事件时间计时器来了解查看ListState中的B事件并生成结果的时间。完成后不要忘记清除状态。
如果您不熟悉Flink API的这一部分,那么流程函数教程应该会有所帮助。