我是 Flink 的新手,有一个我不知道如何处理的用例。
我有活动要来
{
"id" : "AAA",
"event" : "someEvent",
"eventTime" : "2019/09/14 14:04:25:235"
}
我想创建一个表(弹性/预言机(来跟踪用户不活动。
id ||lastEvent || lastEventTime || inactivityTime
我的最终目标是在某个用户组处于活动状态超过 X 分钟时发出警报。
此表应每 1 分钟更新一次。
我事先不知道我所有的身份证,新的ID可以随时出现。
我想也许只是使用简单的进程函数来发出事件(如果存在(或者发出时间戳(这将更新不活动列(。
问题
- 关于我的解决方案 - 我仍然需要另一段代码来检查事件是否为 null 并相应地更新。如果为 null -->则更新为非活动状态。否则更新上一个事件。 可以/应该在同一个闪烁/火花作业中编写此代码吗?
如何处理新 ID?
另外,如何在火花结构化流中处理此用例?
input .keyBy("id") .window(TumblingEventTimeWindows.of(Time.minutes(1))) .process(new MyProcessWindowFunction()); public class MyProcessWindowFunction extends ProcessWindowFunction<Tuple2<String, Long>, Tuple2<Long, Object>> { @Override public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<Long, Object>> out) { Object obj = null; while(input.iterator().hasNext()){ obj = input.iterator().next(); } if (obj!=null){ out.collect(Tuple2.of(context.timestamp(), obj)); } else { out.collect(Tuple2.of(context.timestamp(), null)); } }
我会使用KeyedProcessFunction
而不是窗口API来满足这些要求。[1] 流由 id 键控。
KeyedProcessFunction#process
为流的每个记录调用,您可以保留状态和计划计时器。您可以每分钟安排一个计时器,并为每个 od 存储状态中看到的最后一个事件。当计时器触发时,您可以发出事件并清除状态。
就个人而言,我只会存储数据库中看到的最后一个事件,并在查询数据库时计算不活动时间。通过这种方式,你可以在每次发射后清除状态,并且可能无界的键空间不会导致 Flink 中的每个托管状态都增长。
希望这有帮助。
[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html