如何使Apache Flink中的滑动窗口仅在达到窗口大小后滑动



私有datastream buysidevolumewma(datastream buypressuretradestream({

    Integer windowSize = 3;
    Integer windowslide = 1;
    DataStream<Double> buySideVolumeWMAStream = buyPressureTradeStream.countWindowAll(windowSize, windowslide)
            .apply(new AllWindowFunction<String, Double, GlobalWindow>() {
                @Override
                public void apply(GlobalWindow window, Iterable<String> values, Collector<Double> out)
                        throws Exception {
                    Double buySideVolumeWMA = 0.0;
                    Integer weight = windowSize;
                    Integer numerator = 1;
                    for (String tradeString : values) {
                        JSONObject json = new JSONObject(tradeString);
                        Double tradeVolume = (Double) json.get("Volume");
                        buySideVolumeWMA += ((tradeVolume * numerator) / weight);
                        slf4jLogger.info("tradeVolume " + tradeVolume + " , " + "numerator , " + numerator
                                + " weight , " + weight + " buySideVolumeWMA " + buySideVolumeWMA);
                        numerator++;
                    }
                    numerator = 1;
                    out.collect(buySideVolumeWMA / 2);
                    buySideVolumePressure = buySideVolumeWMA / 2;
                    // slf4jLogger.info("buySideVolumePressure :" +
                    // buySideVolumePressure);

    buySideVolumeWMAStream.print().setParallelism(5);
    return buySideVolumeWMAStream;
}

======================================================================================在此程序中,我使用的窗口尺寸为3和幻灯片大小。计数3的流数据,然后仅启动幻灯片。但是,发生的事情是,我的程序在接收第一个数据时立即开始滑动,然后它幻灯片幻灯片幻灯计数3的数据,然后滑动1?

您可以在窗口中添加偏移量。这是窗口命令的第三个参数。这样,您可以在以后开始。

文档中的示例:

// sliding processing-time windows offset by -8 hours
input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

了解更多信息:https://ci.apache.org/projects/flink/flink/flink-docs-release-1.2/dev/windows.html

据我所知,这不是2019年11月和Flink 1.9.1的滑动窗口的功能。我知道这是因为窗口对象是分开的,并且不共享任何状态。例如,Windows中的对象被复制和存储一次为每个窗口,如果使用键流。

以下过滤器保持足够的状态以忽略其收到的前N消息。如果使用键流(如.keyBy(...)中(,每个键都将保留一个单独的计数器,因为这是Flink管理估值对象的方式。

  /**
   * This filter suppresses the first n messages (inclusive of n). This behavior may be desired for use with sliding
   * windows when no output is desired until the full size of the window is reached.
   *
   * Example usage:
   * .filter(new SuppressFirstNFromSlidingWindow[(String, Int)](5))
   */
  class SuppressFirstNFromSlidingWindow[T](nToSuppress: Int) extends RichFilterFunction[T] {
    private var state_allowAll: ValueState[Boolean] = _
    private var state_numberSkipped: ValueState[Int] = _
    override def filter(value: T): Boolean = {
      if (state_allowAll.value()) return true
      val numberSkipped = state_numberSkipped.value()
      if (numberSkipped < nToSuppress) {
        state_numberSkipped.update(numberSkipped + 1)
        false
      } else {
        state_allowAll.update(true)
        true
      }
    }
    override def open(parameters: Configuration): Unit = {
      state_allowAll = getRuntimeContext.getState(
        new ValueStateDescriptor[Boolean]("allowAll", createTypeInformation[Boolean])
      )
      state_numberSkipped = getRuntimeContext.getState(
        new ValueStateDescriptor[Int]("numberSkipped", createTypeInformation[Int])
      )
    }
  }

相关内容

  • 没有找到相关文章

最新更新