我正在尝试使用一些网络监视器工作进行flink。我的目标是计算每个src_ip
的不同dst_ip
。
我的以下代码有效,但性能真的很差。似乎每个滑动窗口都会重新计算所有事件,但这应该是不必要的。
例如,我们有准时的事件秒 1 - 600。Flink 可以得到每秒的累加器,所以我们每秒有 600 个累加器。当第一个滑动窗口到期时,flink 只需合并 1-300 的累加器,并销毁第二个 1 的累加器。此窗口也可以在最后一秒之前预合并 1-299。当第二个滑动窗口到期时,flink 只需合并 2-301 的累加器,并销毁第二个 2 的累加器。等等.....
这种方式将比将事件分配给多个窗口并计算每个窗口的聚合要高效得多。
flink 支持这个吗?我可以用flink自己获得类似的功能吗?
多谢!
public static class AverageAccumulator2 {
String key;
Set<String> target;
AverageAccumulator2() {
target = new HashSet<>();
}
}
public static class Average2 implements AggregateFunction<ObjectNode, AverageAccumulator2, Tuple3<String, Long, Set<String>>> {
@Override
public AverageAccumulator2 createAccumulator() {
return new AverageAccumulator2();
}
@Override
public AverageAccumulator2 add(ObjectNode value, AverageAccumulator2 accumulator) {
accumulator.key = value.get("value").get("src_ip").asText();
accumulator.target.add(value.get("value").get("dst_ip").asText());
return accumulator;
}
@Override
public Tuple3<String, Long, Set<String>> getResult(AverageAccumulator2 accumulator) {
return new Tuple3<>(accumulator.key, (long) accumulator.target.size(), accumulator.target);
}
@Override
public AverageAccumulator2 merge(AverageAccumulator2 a, AverageAccumulator2 b) {
a.target.addAll(b.target);
return a;
}
}
final SingleOutputStreamOperator<Tuple3<String, Long, Set<String>> > process2 =
stream.keyBy(value -> value.get("value").get("sip").asText())
.timeWindow(Time.seconds(300),Time.seconds(1))
.aggregate(new Average2());
正如你所观察到的,Flink 并没有尝试优化滑动窗口。使用细粒度滑动,这确实变得非常昂贵。
你可以做的是实现你自己的逻辑,用于使用 ProcessFunction 处理状态和计时器 - 你可以像你概述的那样实现这一点。您将有一个 processElement 方法,该方法用于更新将用于累积结果的数据结构,以及一个 onTimer 方法,该方法每秒触发一次,将部分结果合并在一起,然后将结果发送到下游。