如何在 Apache Flink 中为会话窗口分配 id?
最终,我想在会话窗口打开时使用会话窗口 ID 逐个丰富事件(我不想等到窗口关闭后再发出丰富的事件(。
我尝试使用AggregateFunction来做到这一点,但是我认为merge((没有像我预期的那样工作。它似乎用于合并窗口而不是窗格(触发触发(。它似乎从未在我的管道中被调用过。因此,触发器之间似乎没有共享状态!
会话窗口 ID 将是落入窗口的第一个事件的时间戳(由于非保证排序,这可能意味着某些事件可能会落入具有较早时间戳的同一会话窗口 - 我对此没意见(。
public class FooSessionState {
private Long sessionCreationTime;
private FooMatch lastMatch;
}
/**
* Aggregator that assigns session ids to elements of a session window
*/
public class SessionIdAssigner implements
AggregateFunction<FooMatch, FooSessionState, FooSessionEvent> {
static final long serialVersionUID = 0L;
@Override
public FooSessionState createAccumulator() {
return new FooSessionState();
}
@Override
public FooSessionState add(FooMatch value, FooSessionState sessionState) {
if (sessionState.getSessionCreationTime() == null) {
sessionState.setSessionCreationTime(value.getReport().getTimestamp());
}
sessionState.setLastMatch(value);
return sessionState;
}
@Override
public FooSessionEvent getResult(FooSessionState accumulator) {
FooSessionEvent sessionEvent = new FooSessionEvent();
sessionEvent.setFooMatch(accumulator.getLastMatch());
sessionEvent.setSessionCreationTime(accumulator.getSessionCreationTime());
return sessionEvent;
}
@Override
public FooSessionState merge(FooSessionState a, FooSessionState b) {
if ( a.getSessionCreationTime() != null) {
b.setSessionCreationTime(a.getSessionCreationTime());
}
return b;
}
}
我的计划是按如下方式使用它:
stream.keyBy(new FooMatchKeySelector())
.window(EventTimeSessionWindows.withGap(Time.milliseconds(config.getFooSessionWindowTimeout())))
.trigger(PurgingTrigger.of(CountTrigger.of(1L)))
.aggregate(new SessionIdAssigner())
我认为会话窗口不适合您想要实现的目标。它们旨在聚合每个会话的事件,而不是丰富每个事件,即它们计算结果并在窗口关闭时发出它。正如您所注意到的,会话窗口的工作原理是为每个事件创建一个新窗口并合并重叠的窗口。之所以选择此设计,是因为事件可能会无序到达。因此,您可能会有两个窗口,这两个窗口稍后通过桥接事件连接。
我建议使用收集事件并根据时间戳对它们进行排序的ProcessFunction
来实现逻辑。收到水印时,它会发出具有正确会话 ID 的所有收集的事件。因此,您仅将两个水印之间的事件保留在状态中。除了这些事件之外,还需要保留上次发出的事件的时间戳和上次发出的会话 ID 以执行正确的会话化。