上下文:
有一个 ds: DataStream[Event] ,有 3 种类型的事件:A、B、C。
问题 :如何监控不同事件在不同时间间隔的发生?
即假设一秒钟中只有一个事件,并且它是有序的。
DS = A, B, C, A, C, B, A, C,A, B, C....
然后
每 3 秒出现 A 的次数为:
1、1、1、1、1、1、2、1、1...每 2 秒出现 B 的次数为:
1、1、0、0、1、1、0、0、1、1...每 4 秒出现 C 的次数为:
您没有明确说明,但我假设您必须在不同窗口时间内跟踪的唯一事件类型数量是任意大的。我进一步假设您已经或可以轻松地为每个事件类型创建一个唯一的字符串。在这种情况下:
使用 MapFunction 将事件转换为元组,其中字符串是事件 ID。
使用 DataStream.split 按事件 ID 拆分流。
棘手的一点通过在 for 循环中调用 SplitStream.select 并遍历 EventID 来创建多个数据流。
同样在 for 循环中,将窗口函数应用于每个流。
最后,仍然在 for 循环中,将每个数据流与前一个数据流联合起来(您可以为此重复使用相同的变量(
flink 的文档几乎从不定义循环中的运算符,但这样做是完全合法的。
以下是 for 循环的胆量应该是什么样子的:
DataStream<String> finalText=null;//gets rid of "might not be defined" warnings
for (Integer i = 0; i<3; i++){
DataStream<String> tempStream =
splitStream.select(i.toString())
.map(new passthroughMapFunction<String>())
//window function can go here
.name("Map"+i);
if (finalText==null){
finalText = tempStream;
} else {
finalText = finalText.union(tempStream);
}
}
val aDs = all.filter(_.type == "A")
val bDs = all.filter(_.type == "B")
val cDs = all.filter(_.type == "C")
然后将您想要的任何内容应用于不同的数据流
如果筛选谓词计算量很大,则应事先在映射中预先计算它,以使特定于类型的筛选器尽可能轻量级。