我有一个用例,我认为我需要一些帮助。因为我是流媒体和翻转的新手,所以我会尝试在尝试实现的目标上非常描述性。抱歉,如果我不使用正式和正确的语言。
我的代码将在java中
TL:DR
- 在一定时间限制内的同一密钥的组事件。
- 在这些事件中,仅在最接近的两个(时域)事件中创建一个结果事件。
- 这需要(我认为)为每一个事件打开一个窗口。
- 如果您要展望批处理解决方案,您将最了解我的问题。
背景:
- 我有来自Kafka流的传感器的数据。
- 我需要使用EventTime,因为该数据未记录。给我90%的活动的迟到大约是1分钟。
- 我将这些事件按一些键进行分组。
我想做什么:
- 取决于某些事件的字段 - 我想将2个事件"加入/混音"到一个新事件("结果事件")。
- 第一个条件是那些连续事件彼此30秒内。
- 下一个条件只是检查一些字段值,而不是决定。
我的psuedo解决方案:
- 为每个事件打开一个新窗口。该窗口应为1分钟。
- 对于那分钟内发生的每个事件 - 我想检查其事件时间,看看它是否是初始窗口事件的30秒。如果是 - 检查其他条件并省略新的结果流。
问题 - 当新事件出现时,需要:
- 为自己创建一个新窗口。
- 从几个可能的窗口中仅加入一个窗口,距离它30秒。
问题:
可能吗?
换句话说,我的连接仅在两个"连续"事件之间。
非常感谢。
也许显示**批次案例的解决方案将显示我想做的最好的事情:**
for i in range(grouped_events.length):
event_A = grouped_events[i]
event_B = grouped_events[i+1]
if event_B.get("time") - event_A.get("time") < 30:
if event_B.get("color") == event_A.get("color"):
if event_B.get("size") > event_A.get("size"):
create_result_event(event_A, event_B)
我的(天真)在Java中的flink
在flink中尝试**总和函数只是我函数创建一个新结果对象的位置持有人...
- 第一个解决方案只是执行一个简单的时间窗口,并通过某个字段进行总结
第二,试图在窗口上执行一些过程功能,也许在那里迭代所有事件并检查我的条件吗?
DataStream .keyBy(threeEvent -> threeEvent.getUserId()) .window(TumblingEventTimeWindows.of(Time.seconds(60))) .sum("size") .print(); DataStream .keyBy(threeEvent -> threeEvent.getUserId()) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .process(new processFunction()); public static class processFunction extends ProcessWindowFunction<ThreeEvent, Tuple3<Long, Long, Float>, Long, TimeWindow> { @Override public void process(Long key, Context context, Iterable<ThreeEvent> threeEvents, Collector<Tuple3<Long, Long, Float>> out) throws Exception { Float sumOfSize = 0F; for (ThreeEvent f : threeEvents) { sumOfSize += f.getSize(); } out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips)); } }
当然,您可以使用窗口来创建您进行排序和分析的迷你批次,但是很难正确处理窗口边界(如果应该配对的事件,该怎么办降落在不同的窗户?)
这看起来更容易使用键入的流和一个状态的flatmap来完成。只需使用RichFlatMapFunction,然后使用一个键入的状态(一个估计),以记住每个键的先前事件。然后,在处理每个事件时,将其与已保存的事件进行比较,如果应发生这种情况,就会产生结果,然后更新状态。
您可以在Flink培训和Flink文档中阅读有关使用Flink的键控状态的信息。
关于您的用例,我关心的一件事是您的事件是否可能到达。是否需要首先通过时间戳将事件排序正确的情况?那不是微不足道的。如果这是一个问题,那么我建议您将Flink SQL与Match_Revenize或CEP库一起使用,或者CEP库是为了在事件流上进行模式识别而设计的,并且会为您分类流(您只有提供时间戳和水印)。
此查询可能不是完全正确的,但希望传达如何使用Match识别这样的事情的风味:
SELECT * FROM Events
MATCH_RECOGNIZE (
PARTITION BY userId
ORDER BY eventTime
MEASURES
A.userId as userId,
A.color as color,
A.size as aSize,
B.size as bSize
AFTER MATCH SKIP PAST LAST ROW
PATTERN (A B)
DEFINE
A AS true,
B AS ( timestampDiff(SECOND, A.eventTime, B.eventTime) < 30)
AND A.color = B.color
AND A.size < B.size )
);
这也可以自然地使用CEP进行,其中比较连续事件的基础是使用迭代条件,并且您可以使用within
子句处理时间约束。