Flink 窗口连接在使用事件时间和时间戳分配器时不起作用



我刚刚遇到了一个非常奇怪的问题,即在使用带有时间戳和水印分配器的 EventTime 时,我无法从流窗口连接中获得任何结果。

我正在使用 Kafka 作为我的数据流源,并尝试了 AscendingTimestampExtractor 和自定义分配器,它们实现了 AssignerWithPeriodicWatermarks,如 Flink 文档中所述,正如我所测试的那样,没有发出水印,也没有生成连接结果。如果我更改为使用没有任何时间戳分配器的 ProcessingTimeTumblingProcessingTimeWindows,那么我可以得到正确的结果。

我的自定义时间戳和水印分配器代码是这样的:

FlinkKafkaConsumer09<String> myConsumer1 =
                new FlinkKafkaConsumer09<>(myTopic1, new SimpleStringSchema(), props);
myConsumer1.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
FlinkKafkaConsumer09<String> myConsumer2 =
                new FlinkKafkaConsumer09<>(myTopic2, new SimpleStringSchema(), props);
myConsumer2.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
...
public static class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<String> {
        private long currentMaxTimestamp;
        @Override
        public long extractTimestamp(String element, long previousElementTimestamp) {
            long timestamp = myFunctionToGetMillisFromString(element);
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(currentMaxTimestamp - 1L);
        }
}
...
DataStream<myPOJO1> stream1 = env.addSource(myConsumer1).map(new MyMapper1());
DataStream<myPOJO2> stream2 = env.addSource(myConsumer2).map(new MyMapper2());
stream1.join(stream2)
    .where(new KeySelector1())
    .equalTo(new KeySelector2())
    .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
    .apply(new JoinFunction<AdClick, GameCreate, TransferResult>() {...});

我的AscendingTimestampExtractor代码是这样的:

FlinkKafkaConsumer09<String> myConsumer1 =
                new FlinkKafkaConsumer09<>(myTopic1, new SimpleStringSchema(), props);
myConsumer1.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
    @Override
    public long extractAscendingTimestamp(String element) {
        return myFunctionToGetMillisFromString(element);
    }
});
FlinkKafkaConsumer09<String> myConsumer2 =
                new FlinkKafkaConsumer09<>(myTopic2, new SimpleStringSchema(), props);
myConsumer2.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<String>() {
    @Override
    public long extractAscendingTimestamp(String element) {
        return myFunctionToGetMillisFromString(element);
    }
});
...
DataStream<myPOJO1> stream1 = env.addSource(myConsumer1).map(new MyMapper1());
DataStream<myPOJO2> stream2 = env.addSource(myConsumer2).map(new MyMapper2());
stream1.join(stream2)
    .where(new KeySelector1())
    .equalTo(new KeySelector2())
    .window(TumblingEventTimeWindows.of(Time.seconds(windowSize)))
    .apply(new JoinFunction<AdClick, GameCreate, TransferResult>() {...});

感谢您的帮助!

我遇到了同样的问题,这是一个相当愚蠢的错误,我在这里找到了解决方案:

当你写:

myConsumer1.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());

它创建一个新的数据流,而不是修改该流,并且您没有将其存储在变量中。所以底线是:

将其存储在新的数据流中,并将联接应用于此数据流(将分配这些时间戳和水印)。

myConsumer3 = myConsumer1.assign***myConsumer4 = myConsumer2.assign***

并使用myConsumer3/myConsumer4,这样就可以了

最新更新