我有以下情况:假设有 20 个传感器正在向我发送流源。我对流应用键(传感器 ID(并执行一些操作,例如平均值等。这是实现的,并且运行良好(使用 Flink Java API(。
最初一切顺利,所有的传感器都在向我发送饲料。一段时间后,可能会发生几个传感器开始行为不端的情况,我开始从它们那里获得不规则的馈送,例如,我从 18 个传感器接收馈送,但 2 个传感器长时间不向我发送馈送。
我们可以假设我已经知道 sensorId 的固定列表(可能是硬编码的/或在数据库中(。如何识别哪两个未发送源?在哪里可以获得要与数据库中的列表进行比较的 keyId 列表?
如果我没有得到提要,我想发出警报(例如 2 分钟、5 分钟、10 分钟等,优先级越来越高(。
有没有人使用 flink 流/模式实现了这样的场景?请提出任何建议。
从技术上讲,您可以使用ProcessFunction
和计时器。
您可以简单地为每条记录注册计时器,并在收到数据时重置它。如果您安排计时器在 5 分钟处理时间后运行,这基本上意味着如果您没有收到数据,它将调用函数onTimer
,您可以从中简单地发出一些警报。可以重新注册已触发警报的计时器,以允许发出具有更高严重性的警报。
请注意,这仅在最初所有传感器都正常工作的情况下才有效。具体来说,它只会针对至少见过一次的密钥发出警报。但是从您的描述来看,它似乎可以解决您的问题。
我碰巧有一个例子。它需要一些调整以适应您的用例,但应该可以让您入门。
public class TimeoutFunction extends KeyedProcessFunction<String, Event, String> {
private ValueState<Long> lastModifiedState;
static final int TIMEOUT = 2 * 60 * 1000; // 2 minutes
@Override
public void open(Configuration parameters) throws Exception {
// register our state with the state backend
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
}
@Override
public void processElement(Event event, Context ctx, Collector<String> out) throws Exception {
// update our state and timer
Long current = lastModifiedState.value();
if (current != null) {
ctx.timerService().deleteEventTimeTimer(current + TIMEOUT);
}
current = max(current, event.timestamp());
lastModifiedState.update(current);
ctx.timerService().registerEventTimeTimer(current + TIMEOUT);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// emit alert
String deviceId = ctx.getCurrentKey();
out.collect(deviceId);
}
}
这假设一个主程序执行以下操作:
DataStream<String> result = stream
.assignTimestampsAndWatermarks(new MyBoundedOutOfOrdernessAssigner(...))
.keyBy(e -> e.deviceId)
.process(new TimeoutFunction());
正如@Dominik所说,这只会对至少见过一次的键发出警报。您可以通过引入事件的辅助源来解决此问题,该事件源为每个应该存在的源创建一个人工事件,并将该流与主源合并。
现在这个模式对我来说非常清楚。我已经实施了该解决方案,它就像魅力一样工作。
如果有人需要代码,那么我很乐意分享