Flink 滑动计数窗口行为



假设我们有这样的数据结构:

Tuple2<ArryaList<Long>, Integer>

第一个字段是长度为 1 的ArrayList,其中包含时间戳,整数字段是介于 1 和 40 之间的数字,名为channel。目标是使用相同的键(channel(聚合每 400 条消息,并对它们应用ReduceFunction(它只是合并元组第一个字段中 400 条消息的时间戳(。 我将channel字段设置为消息的键,并创建了一个计数窗口 400。例如,如果我们有160000条消息作为输入,它应该输出160000/400 = 400行,并且 计数 窗口按预期工作。问题是当我使用滑动计数窗口时,我的预期行为是:

Flink 为每个channel数字创建逻辑窗口并首次应用ReduceFunction,如果逻辑窗口的长度达到 400,之后每 100 个输入数据,与逻辑窗口的键相同,也会调用窗口中最后 400 条消息的ReduceFunction所以我们应该有:

  • 160000 - 400 = 159600//前 400 个输入将首次调用 reduce 函数
  • 159600 / 100 = 1596//在前 400 个输入之后,对于每 100 个输入 Flink 调用最后 400 个输入的 reduce 函数
  • 1 + 1596 = 1597//输出的行数

但是运行滑动计数窗口,它会输出 1600 行,这些行具有可变长度。(我预计输出长度仅为 400(

要点:长度我的意思是 ArrayList 的大小(Tuple2 的第一个字段(

  • 前40通道-->长度100
  • 第二个40通道-->长度299
  • 第三个40通道-->长度598
  • 第四个40通道-->长度997
  • 剩下的40通道-->长度400

如何证明此类行为的合理性并实现所需的"滑动计数"窗口?

以下是源代码:

DataStream<Tuple2<ArrayList<Long>, Integer>> data ;
data.keyBy(1).countWindow(400, 100)
.reduce(new ReduceFunction<Tuple2<ArrayList<Long>, Integer>>() {
@Override
public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
t0.f0.add(t1.f0.get(0));
return t0;
}
}).writeAsText("results400").setParallelism(1);

更新:根据@DavidAnderson建议,我也尝试在ReduceFunstion中创建一个新的元组而不是修改t0,但它导致了相同的输出。

public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
ArrayList<Long> times = t0.f0;
times.addAll(t1.f0);
return new Tuple2<>(times, t0.f1) ;
}

这是countWindow的实现

public WindowedStream<T, KEY, GlobalWindow> countWindow(long size, long slide) {
return window(GlobalWindows.create())
.evictor(CountEvictor.of(size))
.trigger(CountTrigger.of(slide));
}

它的行为方式与您预期的不太一样。该窗口每 100 个元素(幻灯片(触发一次,无论它是否包含 400 个元素(大小(。大小控制最多保留多少元素。

感谢大卫安德森的建议,将ReduceFunction修改为以下内容可以解决问题。我们应该在ReduceFunction中创建一个新对象:

public Tuple2<ArrayList<Long>, Integer> reduce(Tuple2<ArrayList<Long>, Integer> t0, Tuple2<ArrayList<Long>, Integer> t1) throws Exception {
ArrayList<Long> times = new ArrayList<>();
times.addAll(t0.f0);
times.addAll(t1.f0);

return new Tuple2<>(times, t0.f1) ;
}

请注意,问题中的两种归约方法都会导致不正确的输出。 现在输出如下所示:

  • 前40通道-->长度100
  • 第二个40通道-->长度200
  • 第三条40通道-->长度300
  • 每40个通道残骸-->长度为400

因此,Flink 滑动计数窗口的行为是它调用每个滑动计数输入消息ReduceFunction。因此,在我们有 160000 条输入消息的情况下,结果编号应该是:160000/100 = 1600

相关内容

  • 没有找到相关文章

最新更新