如何在Flink中实现一个触发器,该触发器缓冲到超时,并在超时后触发?
如果窗口中至少有一个元素,我希望注册触发器,然后缓冲到一秒钟,并在一秒钟后触发。如果窗口中没有元素,那么触发器不会自行注册,所以我不希望看到任何输出。
我不希望触发器每秒产生大量流量,无论窗口中是否存在元素。另一方面,说窗口中只有一个元素,我不希望它坐在那里,直到水印或永远。相反,我希望有一个超时,这样我就可以在一秒钟后看到那个元素。
ProcessingTimeTrigger.create()
会这样做吗?如果是,ProcessingTimeTrigger.create()
和CountinousProcessingTimeTrigger
之间有什么不同?
一个正常的一秒钟长的处理时间窗口将为您提供一个窗口,其中包含在一秒钟内发生的所有事件,对于其中至少有一个事件的任何一秒钟。但是这个窗口不会与第一个事件对齐;它将与一天中的时间时钟对齐。因此,例如,如果窗口中的第一个事件发生在给定秒的一半,那么该窗口将只包括第一个事件之后500毫秒的事件。
ProcessingTimeTrigger
会在窗口的末尾触发一次。CountinousProcessingTimeTrigger
以某个指定的速率重复激发。
为了精确地获得您所描述的语义,您需要一个自定义触发器。您可以执行与OneSecondIntervalTrigger示例类似的操作,只是您希望从使用事件时间切换到处理时间,并且只触发一次,而不是重复触发。
这会给你留下这样的东西:
public static class OneSecondIntervalTrigger extends Trigger<SensorReading, TimeWindow> {
@Override
public TriggerResult onElement(SensorReading r, long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// firstSeen will be false if not set yet
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
// register initial timer only for first element
if (firstSeen.value() == null) {
// FIRE the window 1000 msec after the first event
long now = ctx.getCurrentProcessingTime();
ctx.registerProcessingTimeTimer(now + 1000);
fireSeen.update(true);
}
// Continue. Do not evaluate window now
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Continue. We don't use event time timers
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long ts, TimeWindow w, TriggerContext ctx) throws Exception {
// Evaluate the window now
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(TimeWindow w, TriggerContext ctx) throws Exception {
// Clear trigger state
ValueState<Boolean> firstSeen = ctx.getPartitionedState(
new ValueStateDescriptor<>("firstSeen", Types.BOOLEAN));
firstSeen.clear();
}
}