私有datastream buysidevolumewma(datastream buypressuretradestream({
Integer windowSize = 3;
Integer windowslide = 1;
DataStream<Double> buySideVolumeWMAStream = buyPressureTradeStream.countWindowAll(windowSize, windowslide)
.apply(new AllWindowFunction<String, Double, GlobalWindow>() {
@Override
public void apply(GlobalWindow window, Iterable<String> values, Collector<Double> out)
throws Exception {
Double buySideVolumeWMA = 0.0;
Integer weight = windowSize;
Integer numerator = 1;
for (String tradeString : values) {
JSONObject json = new JSONObject(tradeString);
Double tradeVolume = (Double) json.get("Volume");
buySideVolumeWMA += ((tradeVolume * numerator) / weight);
slf4jLogger.info("tradeVolume " + tradeVolume + " , " + "numerator , " + numerator
+ " weight , " + weight + " buySideVolumeWMA " + buySideVolumeWMA);
numerator++;
}
numerator = 1;
out.collect(buySideVolumeWMA / 2);
buySideVolumePressure = buySideVolumeWMA / 2;
// slf4jLogger.info("buySideVolumePressure :" +
// buySideVolumePressure);
buySideVolumeWMAStream.print().setParallelism(5);
return buySideVolumeWMAStream;
}
======================================================================================在此程序中,我使用的窗口尺寸为3和幻灯片大小。计数3的流数据,然后仅启动幻灯片。但是,发生的事情是,我的程序在接收第一个数据时立即开始滑动,然后它幻灯片幻灯片幻灯计数3的数据,然后滑动1?
您可以在窗口中添加偏移量。这是窗口命令的第三个参数。这样,您可以在以后开始。
文档中的示例:
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>);
了解更多信息:https://ci.apache.org/projects/flink/flink/flink-docs-release-1.2/dev/windows.html
据我所知,这不是2019年11月和Flink 1.9.1的滑动窗口的功能。我知道这是因为窗口对象是分开的,并且不共享任何状态。例如,Windows中的对象被复制和存储一次为每个窗口,如果使用键流。
。以下过滤器保持足够的状态以忽略其收到的前N消息。如果使用键流(如.keyBy(...)
中(,每个键都将保留一个单独的计数器,因为这是Flink管理估值对象的方式。
/**
* This filter suppresses the first n messages (inclusive of n). This behavior may be desired for use with sliding
* windows when no output is desired until the full size of the window is reached.
*
* Example usage:
* .filter(new SuppressFirstNFromSlidingWindow[(String, Int)](5))
*/
class SuppressFirstNFromSlidingWindow[T](nToSuppress: Int) extends RichFilterFunction[T] {
private var state_allowAll: ValueState[Boolean] = _
private var state_numberSkipped: ValueState[Int] = _
override def filter(value: T): Boolean = {
if (state_allowAll.value()) return true
val numberSkipped = state_numberSkipped.value()
if (numberSkipped < nToSuppress) {
state_numberSkipped.update(numberSkipped + 1)
false
} else {
state_allowAll.update(true)
true
}
}
override def open(parameters: Configuration): Unit = {
state_allowAll = getRuntimeContext.getState(
new ValueStateDescriptor[Boolean]("allowAll", createTypeInformation[Boolean])
)
state_numberSkipped = getRuntimeContext.getState(
new ValueStateDescriptor[Int]("numberSkipped", createTypeInformation[Int])
)
}
}