使用表API或SQL的Apache Flink滚动窗口时间偏移



任何人都知道如何使用时间偏移量进行滚动窗口操作-窗口大小为一天,时间偏移量基于时区以小时为单位。

我找到了使用DataStream API实现它的示例,想知道如何使用Table API/SQL实现它。

下面是我使用DataStream API的代码。

DataStream<Tuple2<String, Timestamp>> inputStreamWithTime = inputStream
.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Timestamp>>() {
@Override
public long extractTimestamp(Tuple2<String, Timestamp> element, long previousElementTimestamp) {
return element.f1.getTime();
}
@Override
public Watermark checkAndGetNextWatermark(Tuple2<String, Timestamp> lastElement, long extractedTimestamp) {
return new Watermark(extractedTimestamp);
}
});
inputStreamWithTime
.keyBy(new KeySelector<Tuple2<String,Timestamp>, String>() {
@Override
public String getKey(Tuple2<String, Timestamp> in) throws Exception {
return in.f0;
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(60L), Time.seconds(10L)))
.aggregate(new CountAggregate(), new ProcessTumblingWindowFunction())
.map((Tuple4<String, Long, Timestamp, Timestamp> value) -> {
return new Tuple3<String, Long, Timestamp, Timestamp>(value.f0, value.f1, value.f2);
})
.returns(Types.TUPLE(Types.STRING, Types.LONG, Types.SQL_TIMESTAMP))
.addSink(getSink());

提前谢谢。

遗憾的是,在表API/SQL中无法执行该窗口。时间窗口目前始终以API级别的UTC定义。

一个可能的解决方法是改变源连接器中的时间,以便UTC窗口提供正确的结果。但是,您需要在水槽连接器中反向移动它。当然,只有当你不在另一个应用程序中使用源代码时,这种破解才会奏效。

最新更新