我正试图在连续数据流上创建一个翻滚窗口,并在窗口中创建聚合。但是由于某些原因,getResult((没有被调用。
public class MyAggregator implements AggregateFunction<Event, MyMetrics, MyMetrics> {
@Override
public MyMetrics createAccumulator() {
return new MyMetrics(0L, 0L);
}
@Override
public MyMetrics add(Event value, MyMetrics accumulator) {
Instant previousValue = ...;
if (previousValue != null) {
Long myWay = ...;
accumulator.setMyWay(myWay);
}
return accumulator;
}
@Override
public MyMetrics getResult(MyMetrics accumulator) {
System.out.println("Inside getResult()");
return accumulator;
}
@Override
public MyMetrics merge(MyMetrics acc1, MyMetrics acc2) {
return new MyMetrics(
acc1.getMyWay() + acc2.getMyWay());
}
}
注意:event.getClientTime((返回一个Instant对象。
private WatermarkStrategy getWatermarkStrategy() {
return WatermarkStrategy
.<MyEvent>forBoundedOutOfOrderness(Duration.ofMinutes(10))
.withTimestampAssigner(
(event, timestamp) ->
event.getClientTime().toEpochMilli()
);
}
public static void main(String[] args) {
DataStream<MyEvent> watermarkedData = actuals
.assignTimestampsAndWatermarks(
getWatermarkStrategy()
).name("addWatermark");
final OutputTag<MyEvent> lateOutputTag = new OutputTag<MyEvent>("late-data"){};
SingleOutputStreamOperator<OutputModel> output_data = watermarkedData
.keyBy("input_key")
.window(TumblingEventTimeWindows.of(Time.hours(1)))
.sideOutputLateData(lateOutputTag)
.aggregate(
new MyAggregator(),
).name("AggregationRollUp");
output_data.addSink(new PrintSinkFunction<>());
}
任何关于我在这里遗漏了什么的线索都会有所帮助。
首先检查数据的定时,看看它是否符合窗口触发条件
第二个可能是你可以通过将窗口大小从1小时减少到1分钟,并将水印区域从10分钟减少到30秒来进行测试