Windows and States in Apache Flink



第一个问题

我想学习窗户的时间行为。假设我将使用Processing time每2秒处理一次数据,当前时间为10:26:25.169。此时,我部署了作业。

在这种情况下,是否会将每个时间窗口四舍五入为0、2、4秒,依此类推?如下所示;

  • 10:26:24.000 - 10:26:26.000
  • 10:26:26.000 - 10:26:28.000

正如您所看到的,我已经在10:26:25.169上部署了作业,但flink在2秒内绕过了窗口。是这样吗?

如果没有,windows是否像下面这样工作?;

  • 10:26:25.169 - 10:26:27.169
  • 10:26:27.169 - 10:26:29.169

哪一个是真的?当我使用event time而不是processing time时,这种行为会改变吗?

第二个问题

我想保留每个键的状态。为此,我可以使用richFlatMap函数或keyedProcessFunction。但我想知道,在应用窗口后,我可以使用上述功能管理状态吗?例如

// in this case i can manage state by key
ds.keyBy(_.field).process(new MyStateFunction)
// in this case, can i manage state after window for the same key? 
ds.keyBy(keyExtractorWithTime)
.window(new MyWindowFunction)
.reduce(new myRedisFunction)
.process(new MyStateFunction)

对于第一个问题,它总是满2秒的间隔四舍五入,所以基本上就像你所描述的那样:

10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000

有一个offset参数允许您在某种程度上控制这种行为。但基本上,虽然Flink实际上是在数据到达时创建窗口的,但startTimeendTime并不取决于数据何时到达,因此数据适合窗口,而不是相反。

更多信息可以在这里找到

对于第二个问题,ProcessWindowFunction是键控函数,因此您可以在函数中使用键控状态,就像在标准ProcessFunction中一样。

问题1:如果未指定offset参数,flink将默认使用窗口大小的整数倍作为startTime(endTime=startTime+windowSize(。所以你要求的打击是正确的。

10:26:24.000 - 10:26:26.000
10:26:26.000 - 10:26:28.000

在flink中,startTime将通过以下方式计算:

/**
* Method to get the window start for a timestamp.
*
* @param timestamp epoch millisecond to get the window start.
* @param offset The offset which window start would be shifted by.
* @param windowSize The size of the generated windows. windowSize.
* @return window start
*/
public static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}

问题2:如果你想在Keyed Window之后管理状态,下面的方法可能有效。这样可以管理每个窗口的状态和reduce函数结果。

DataStream<SensorReading> input = ...;
input
.keyBy(<key selector>)
.timeWindow(<duration>)
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
public SensorReading reduce(SensorReading r1, SensorReading r2) {
return r1.value() > r2.value() ? r2 : r1;
}
}
private static class MyProcessWindowFunction
extends ProcessWindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
public void process(String key,
Context context,
Iterable<SensorReading> minReadings,
Collector<Tuple2<Long, SensorReading>> out) {
SensorReading min = minReadings.iterator().next();
out.collect(new Tuple2<Long, SensorReading>(context.window().getStart(), min));
}
}

有关更多详细信息,请点击此处。

相关内容

  • 没有找到相关文章

最新更新