Flink / Spark 流 - 跟踪用户不活动



我是 Flink 的新手,有一个我不知道如何处理的用例。

我有活动要来

{
"id" : "AAA",
"event" : "someEvent",
"eventTime" : "2019/09/14 14:04:25:235"
}

我想创建一个表(弹性/预言机(来跟踪用户不活动。

id ||lastEvent || lastEventTime || inactivityTime

我的最终目标是在某个用户组处于活动状态超过 X 分钟时发出警报。

此表应每 1 分钟更新一次。

事先不知道我所有的身份证,新的ID可以随时出现。

我想也许只是使用简单的进程函数来发出事件(如果存在(或者发出时间戳(这将更新不活动列(。

问题

  1. 关于我的解决方案 - 我仍然需要另一段代码来检查事件是否为 null 并相应地更新。如果为 null -->则更新为非活动状态。否则更新上一个事件。 可以/应该在同一个闪烁/火花作业中编写此代码吗?
  2. 如何处理新 ID?

  3. 另外,如何在火花结构化流中处理此用例?

    input
    .keyBy("id")
    .window(TumblingEventTimeWindows.of(Time.minutes(1)))
    .process(new MyProcessWindowFunction());
    public class MyProcessWindowFunction
    extends ProcessWindowFunction<Tuple2<String, Long>, Tuple2<Long, Object>> {
    @Override
    public void process(String key, Context context, Iterable<Tuple2<String, Long>> input, Collector<Tuple2<Long, Object>> out) {
    Object obj = null;
    while(input.iterator().hasNext()){
    obj = input.iterator().next();
    }
    if (obj!=null){
    out.collect(Tuple2.of(context.timestamp(), obj));
    } else {
    out.collect(Tuple2.of(context.timestamp(), null));
    }
    }
    

我会使用KeyedProcessFunction而不是窗口API来满足这些要求。[1] 流由 id 键控。

KeyedProcessFunction#process为流的每个记录调用,您可以保留状态和计划计时器。您可以每分钟安排一个计时器,并为每个 od 存储状态中看到的最后一个事件。当计时器触发时,您可以发出事件并清除状态。

就个人而言,我只会存储数据库中看到的最后一个事件,并在查询数据库时计算不活动时间。通过这种方式,你可以在每次发射后清除状态,并且可能无界的键空间不会导致 Flink 中的每个托管状态都增长。

希望这有帮助。

[1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html

相关内容

  • 没有找到相关文章

最新更新