第一个问题
我想学习窗户的时间行为。假设我将使用Processing time
每2秒处理一次数据,当前时间为10:26:25.169
。此时,我部署了作业。
在这种情况下,是否会将每个时间窗口四舍五入为0、2、4秒,依此类推?如下所示;
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
正如您所看到的,我已经在10:26:25.169
上部署了作业,但flink在2秒内绕过了窗口。是这样吗?
如果没有,windows是否像下面这样工作?;
10:26:25.169 - 10:26:27.169
10:26:27.169 - 10:26:29.169
哪一个是真的?当我使用event time
而不是processing time
时,这种行为会改变吗?
第二个问题
我想保留每个键的状态。为此,我可以使用richFlatMap函数或keyedProcessFunction。但我想知道,在应用窗口后,我可以使用上述功能管理状态吗?例如
// in this case i can manage state by key
ds.keyBy(_.field).process(new MyStateFunction)
// in this case, can i manage state after window for the same key?
ds.keyBy(keyExtractorWithTime)
.window(new MyWindowFunction)
.reduce(new myRedisFunction)
.process(new MyStateFunction)
对于第一个问题,它总是满2秒的间隔四舍五入,所以基本上就像你所描述的那样:
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
有一个offset
参数允许您在某种程度上控制这种行为。但基本上,虽然Flink实际上是在数据到达时创建窗口的,但startTime
和endTime
并不取决于数据何时到达,因此数据适合窗口,而不是相反。
更多信息可以在这里找到
对于第二个问题,ProcessWindowFunction
是键控函数,因此您可以在函数中使用键控状态,就像在标准ProcessFunction
中一样。
问题1:如果未指定offset
参数,flink将默认使用窗口大小的整数倍作为startTime
(endTime
=startTime
+windowSize
(。所以你要求的打击是正确的。
10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000
在flink中,startTime
将通过以下方式计算:
/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows. windowSize.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
问题2:如果你想在Keyed Window之后管理状态,下面的方法可能有效。这样可以管理每个窗口的状态和reduce
函数结果。
DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}
有关更多详细信息,请点击此处。