FLINK-在时间限制内处理连续事件



我有一个用例,我认为我需要一些帮助。因为我是流媒体和翻转的新手,所以我会尝试在尝试实现的目标上非常描述性。抱歉,如果我不使用正式和正确的语言。

我的代码将在java中

TL:DR

  1. 在一定时间限制内的同一密钥的组事件。
  2. 在这些事件中,仅在最接近的两个(时域)事件中创建一个结果事件。
  3. 这需要(我认为)为每一个事件打开一个窗口。
  4. 如果您要展望批处理解决方案,您将最了解我的问题。

背景:

  1. 我有来自Kafka流的传感器的数据。
  2. 我需要使用EventTime,因为该数据未记录。给我90%的活动的迟到大约是1分钟。
  3. 我将这些事件按一些键进行分组。

我想做什么:

  1. 取决于某些事件的字段 - 我想将2个事件"加入/混音"到一个新事件("结果事件")。
  2. 第一个条件是那些连续事件彼此30秒内。
  3. 下一个条件只是检查一些字段值,而不是决定。

我的psuedo解决方案:

  1. 为每个事件打开一个新窗口。该窗口应为1分钟。
  2. 对于那分钟内发生的每个事件 - 我想检查其事件时间,看看它是否是初始窗口事件的30秒。如果是 - 检查其他条件并省略新的结果流。

问题 - 当新事件出现时,需要:

  1. 为自己创建一个新窗口。
  2. 从几个可能的窗口中仅加入一个窗口,距离它30秒。

问题:

可能吗?

换句话说,我的连接仅在两个"连续"事件之间。

非常感谢。

也许显示**批次案例的解决方案将显示我想做的最好的事情:**

for i in range(grouped_events.length):
    event_A = grouped_events[i]
    event_B = grouped_events[i+1]
    if event_B.get("time") - event_A.get("time") < 30:
        if event_B.get("color") == event_A.get("color"):
            if event_B.get("size") > event_A.get("size"):
                create_result_event(event_A, event_B)

我的(天真)在Java中的flink

在flink中尝试

**总和函数只是我函数创建一个新结果对象的位置持有人...

  1. 第一个解决方案只是执行一个简单的时间窗口,并通过某个字段进行总结
  2. 第二,试图在窗口上执行一些过程功能,也许在那里迭代所有事件并检查我的条件吗?

    DataStream
    .keyBy(threeEvent -> threeEvent.getUserId())
    .window(TumblingEventTimeWindows.of(Time.seconds(60)))
    .sum("size")
    .print();
    
    DataStream
    .keyBy(threeEvent -> threeEvent.getUserId())
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new processFunction());
    
    public static class processFunction extends ProcessWindowFunction<ThreeEvent, Tuple3<Long, Long, Float>, Long, TimeWindow> {
        @Override
        public void process(Long key, Context context, Iterable<ThreeEvent> threeEvents, Collector<Tuple3<Long, Long, Float>> out) throws Exception {
            Float sumOfSize = 0F;
            for (ThreeEvent f : threeEvents) {
                sumOfSize += f.getSize();
            }
            out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));
        }
    }
    

当然,您可以使用窗口来创建您进行排序和分析的迷你批次,但是很难正确处理窗口边界(如果应该配对的事件,该怎么办降落在不同的窗户?)

这看起来更容易使用键入的流和一个状态的flatmap来完成。只需使用RichFlatMapFunction,然后使用一个键入的状态(一个估计),以记住每个键的先前事件。然后,在处理每个事件时,将其与已保存的事件进行比较,如果应发生这种情况,就会产生结果,然后更新状态。

您可以在Flink培训和Flink文档中阅读有关使用Flink的键控状态的信息。

关于您的用例,我关心的一件事是您的事件是否可能到达。是否需要首先通过时间戳将事件排序正确的情况?那不是微不足道的。如果这是一个问题,那么我建议您将Flink SQL与Match_Revenize或CEP库一起使用,或者CEP库是为了在事件流上进行模式识别而设计的,并且会为您分类流(您只有提供时间戳和水印)。

此查询可能不是完全正确的,但希望传达如何使用Match识别这样的事情的风味:

SELECT * FROM Events
MATCH_RECOGNIZE (
  PARTITION BY userId
  ORDER BY eventTime
  MEASURES
    A.userId as userId,
    A.color as color,
    A.size as aSize,
    B.size as bSize
  AFTER MATCH SKIP PAST LAST ROW
  PATTERN (A B)
  DEFINE
    A AS true,
    B AS ( timestampDiff(SECOND, A.eventTime, B.eventTime) < 30) 
           AND A.color = B.color 
           AND A.size < B.size )
);

这也可以自然地使用CEP进行,其中比较连续事件的基础是使用迭代条件,并且您可以使用within子句处理时间约束。

相关内容

  • 没有找到相关文章

最新更新