我有点难以理解 Flink 触发器的工作原理。我的数据流包含具有会话 ID 的事件,我根据该会话 ID 聚合了这些事件。每个会话将包含一个"已开始"和一个"已结束"事件,但有时"已结束"事件将丢失。
为了处理这个问题,我设置了一个触发器,每当处理结束的事件时,它都会发出聚合会话。但是,如果 2 分钟内没有事件从该会话到达,我想发出到目前为止我们聚合的任何内容(我们发送事件的应用程序每分钟发送一次检测信号,因此如果我们没有收到任何事件,会话将被视为丢失(。
我已经设置了以下触发函数:
public class EventTimeProcessingTimeTrigger extends Trigger<HashMap, TimeWindow> {
private final long sessionTimeout;
private long lastSetTimer;
// Max session length set to 1 day
public static final long MAX_SESSION_LENGTH = 1000l * 86400l;
// End session events
private static ImmutableSet<String> endSession = ImmutableSet.<String>builder()
.add("Playback.Aborted")
.add("Playback.Completed")
.add("Playback.Error")
.add("Playback.StartAirplay")
.add("Playback.StartCasting")
.build();
public EventTimeProcessingTimeTrigger(long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}
@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
lastSetTimer = ctx.getCurrentProcessingTime() + sessionTimeout;
ctx.registerProcessingTimeTimer(lastSetTimer);
if(endSession.contains(element.get(Field.EVENT_TYPE))) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return time == window.maxTimestamp() ?
TriggerResult.FIRE_AND_PURGE :
TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteProcessingTimeTimer(lastSetTimer);
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
ctx.registerProcessingTimeTimer(ctx.getCurrentProcessingTime() + sessionTimeout);
}
}
为了为事件设置水印,我使用应用程序设置的水印,因为appEventTime可能与服务器上的挂钟不同。我像这样提取水印:
DataStream<HashMap> playerEvents = env
.addSource(kafkaConsumerEvents, "playerEvents(Kafka)")
.name("Read player events from Kafka")
.uid("Read player events from Kafka")
.map(json -> DECODER.decode(json, TypeToken.of(HashMap.class))).returns(HashMap.class)
.name("Map Json to HashMap")
.uid("Map Json to HashMap")
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<HashMap>(org.apache.flink.streaming.api.windowing.time.Time.seconds(30))
{
@Override
public long extractTimestamp(HashMap element)
{
long timestamp = 0L;
Object timestampAsObject = (Object) element.get("CanonicalTime");
timestamp = (long)timestampAsObject;
return timestamp;
}
})
.name("Add CanonicalTime as timestamp")
.uid("Add CanonicalTime as timestamp");
现在我觉得奇怪的是,当我在调试中运行代码并在触发器的清除函数中设置断点时,它会不断被调用。即使触发器中没有达到FIRE_AND_PURGE点。所以感觉我完全误解了触发器应该如何工作。而且我的实现根本没有做我认为它正在做的事情。
我想我的问题是,触发器什么时候应该调用清除?这是实现组合 EventTimeTrigger 和 ProcessingTimeTrigger 的正确方法吗?
感谢我能得到的所有帮助。
更新1:(2020-05-29(
为了提供有关如何设置的更多信息。 我按如下方式设置了我的环境:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.failureRateRestart(60, Time.of(60, TimeUnit.MINUTES), Time.of(60, TimeUnit.SECONDS)));
env.enableCheckpointing(5000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2000);
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
所以我对整个流使用 EventTime。 然后,我创建如下窗口:
DataStream<PlayerSession> playerSessions = sideEvents
.keyBy((KeySelector<HashMap, String>) event -> (String) event.get(Field.SESSION_ID))
.window(ProcessingTimeSessionWindows.withGap(org.apache.flink.streaming.api.windowing.time.Time.minutes(5)))
.trigger(new EventTimeProcessingTimeTrigger(SESSION_TIMEOUT))
.aggregate(new SessionAggregator())
.name("Aggregate events into sessions")
.uid("Aggregate events into sessions");
这种情况很复杂。我犹豫是否要准确预测这段代码将做什么,但我可以解释一些正在发生的事情。
第 1 点:您已将时间特征设置为事件时间,安排时间戳和水印,并在触发器中实现了onEventTime
回调。但是您没有在任何地方创建事件计时器。除非我错过了什么,否则实际上没有使用事件时间或水印。您尚未实现事件时间触发器,我不希望调用onEventTime
。
第 2 点:您的触发器不需要调用清除。Flink 负责在触发器上调用清除,作为清除窗口的一部分。
第 3 点:您的触发器试图反复触发和清除窗口,这似乎不对。我这样说是因为您正在为每个元素创建一个新的处理时间计时器,并且当每个计时器触发时,您正在触发并清除窗口。您可以根据需要随时触发窗口,但只能清除一次窗口,之后它就消失了。
第 4 点:会话窗口是一种特殊的窗口,称为合并窗口。当会话合并时(当事件到达时,这种情况一直发生(,它们的触发器将被合并,其中一个触发器将被清除。这就是为什么你看到清晰如此频繁地被召唤。
建议:由于您有每分钟一次的保持连接,并打算在 2 分钟不活动后关闭会话,因此似乎您可以将会话间隔设置为 2 分钟,这样可以避免相当多的事情使事情变得如此复杂。让会话窗口执行其设计目的。
假设这行得通,那么你可以简单地扩展 Flink 的ProcessingTimeTrigger
并覆盖它的onElement
方法来做到这一点:
@Override
public TriggerResult onElement(HashMap element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (endSession.contains(element.get(Field.EVENT_TYPE))) {
return TriggerResult.FIRE_AND_PURGE;
}
return super(element, timestamp, window, ctx);
}
以这种方式,窗口将在两分钟不活动后触发,或者由显式会话结束事件触发。
你应该能够简单地继承ProcessingTimeTrigger
行为的其余部分。
如果要使用事件时间,请使用EventTimeTrigger
作为超类,并且您必须找到一种方法来确保即使流空闲时水印也能取得进展。请参阅此答案以了解如何处理。
同样的问题
我已经将时间特征设置为处理时间和触发器:
//the trigger
.trigger(PurgingTrigger.of(TimerTrigger.of(Time.seconds(winSec))))
以下触发函数:
//override the ProcessingTimeTrigger behavior
public class TimerTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long interval;
private final ReducingStateDescriptor<Long> stateDesc;
private TimerTrigger(long winInterValMills) { //window
this.stateDesc = new ReducingStateDescriptor("fire-time", new TimerTrigger.Min(), LongSerializer.INSTANCE);
this.interval = winInterValMills;
}
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
}
long now = System.currentTimeMillis();
ReducingState<Long> fireTimestamp = (ReducingState) ctx.getPartitionedState(this.stateDesc);
if (fireTimestamp.get() == null) {
long time = Math.max(timestamp, window.maxTimestamp()) + interval;
if (now-window.maxTimestamp()>interval){ // fire late
time = (now-now%1000) + interval-1;
}
ctx.registerProcessingTimeTimer(time);
fireTimestamp.add(time);
return TriggerResult.CONTINUE;
} else {
return TriggerResult.CONTINUE;
}
}
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
if (time == window.maxTimestamp()){
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = (ReducingState) ctx.getPartitionedState(this.stateDesc);
if (((Long) fireTimestamp.get()).equals(time)) {
fireTimestamp.clear();
long maxTimestamp = Math.max(window.maxTimestamp(), time); //maybe useless
if (maxTimestamp == time) {
maxTimestamp = time + this.interval;
}
fireTimestamp.add(maxTimestamp);
ctx.registerProcessingTimeTimer(maxTimestamp);
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
public void clear(W window, TriggerContext ctx) throws Exception {
ReducingState<Long> fireTimestamp = (ReducingState) ctx.getPartitionedState(this.stateDesc);
long timestamp = (Long) fireTimestamp.get();
ctx.deleteProcessingTimeTimer(timestamp);
fireTimestamp.clear();
}
public boolean canMerge() {
return true;
}
public void onMerge(W window, OnMergeContext ctx) {
ctx.mergePartitionedState(this.stateDesc);
}
@VisibleForTesting
public long getInterval() {
return this.interval;
}
public String toString() {
return "TimerTrigger(" + this.interval + ")";
}
public static <W extends Window> TimerTrigger<W> of(Time interval) {
return new TimerTrigger(interval.toMilliseconds());
}
private static class Min implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
private Min() {
}
public Long reduce(Long value1, Long value2) throws Exception {
return Math.min(value1, value2);
}
}
}