我刚刚遇到了一个非常奇怪的问题,即在使用带有时间戳和水印分配器的 EventTime 时,我无法从流窗口连接中获得任何结果。
我正在使用 Kafka 作为我的数据流源,并尝试了 AscendingTimestampExtractor 和自定义分配器,它们实现了 AssignerWithPeriodicWatermarks,如 Flink 文档中所述,正如我所测试的那样,没有发出水印,也没有生成连接结果。如果我更改为使用没有任何时间戳分配器的 ProcessingTime 和 TumblingProcessingTimeWindows,那么我可以得到正确的结果。
我的自定义时间戳和水印分配器代码是这样的:
FlinkKafkaConsumer09<String> myConsumer1 =
new FlinkKafkaConsumer09<>(myTopic1, new SimpleStringSchema(), props);
myConsumer1.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
FlinkKafkaConsumer09<String> myConsumer2 =
new FlinkKafkaConsumer09<>(myTopic2, new SimpleStringSchema(), props);
myConsumer2.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
...
public static class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<String> {
private long currentMaxTimestamp;
@Override
public long extractTimestamp(String element, long previousElementTimestamp) {
long timestamp = myFunctionToGetMillisFromString(element);
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - 1L);
}
}
...
DataStream<myPOJO1> stream1 = env.addSource(myConsumer1).map(new MyMapper1());
DataStream<myPOJO2> stream2 = env.addSource(myConsumer2).map(new MyMapper2());
stream1.join(stream2)
.where(new KeySelector1())
.equalTo(new KeySelector2())
.window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.apply(new JoinFunction<AdClick, GameCreate, TransferResult>() {...});
我的AscendingTimestampExtractor代码是这样的:
FlinkKafkaConsumer09<String> myConsumer1 =
new FlinkKafkaConsumer09<>(myTopic1, new SimpleStringSchema(), props);
myConsumer1.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
return myFunctionToGetMillisFromString(element);
}
});
FlinkKafkaConsumer09<String> myConsumer2 =
new FlinkKafkaConsumer09<>(myTopic2, new SimpleStringSchema(), props);
myConsumer2.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
@Override
public long extractAscendingTimestamp(String element) {
return myFunctionToGetMillisFromString(element);
}
});
...
DataStream<myPOJO1> stream1 = env.addSource(myConsumer1).map(new MyMapper1());
DataStream<myPOJO2> stream2 = env.addSource(myConsumer2).map(new MyMapper2());
stream1.join(stream2)
.where(new KeySelector1())
.equalTo(new KeySelector2())
.window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
.apply(new JoinFunction<AdClick, GameCreate, TransferResult>() {...});
感谢您的帮助!
我遇到了同样的问题,这是一个相当愚蠢的错误,我在这里找到了解决方案:
当你写:
myConsumer1.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
它创建一个新的数据流,而不是修改该流,并且您没有将其存储在变量中。所以底线是:
将其存储在新的数据流中,并将联接应用于此数据流(将分配这些时间戳和水印)。
myConsumer3 = myConsumer1.assign***myConsumer4 = myConsumer2.assign***
并使用myConsumer3/myConsumer4,这样就可以了