kafka源流的事件时间窗口



Kafka服务器中有一个主题。在程序中,我们将此主题作为流读取,并分配事件时间戳。然后在此流上执行窗口操作。但是这个程序不起作用。调试后,似乎没有执行WindowOperator的processWatermark方法。这是我的密码。

    DataStream<Tuple2<String, Long>> advertisement = env
            .addSource(new FlinkKafkaConsumer082<String>("advertisement", new SimpleStringSchema(), properties))
            .map(new MapFunction<String, Tuple2<String, Long>>() {
                private static final long serialVersionUID = -6564495005753073342L;
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] splits = value.split(" ");
                    return new Tuple2<String, Long>(splits[0], Long.parseLong(splits[1]));
                }
            }).assignTimestamps(timestampExtractor);
    advertisement
            .keyBy(keySelector)
            .window(TumblingTimeWindows.of(Time.of(10, TimeUnit.SECONDS)))
            .apply(new WindowFunction<Tuple2<String,Long>, Integer, String, TimeWindow>() {
                private static final long serialVersionUID = 5151607280638477891L;
                @Override
                public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> values, Collector<Integer> out) throws Exception {
                    out.collect(Iterables.size(values));
                }
            }).print();

为什么会发生这种情况?如果我在"assignTimestamps(timestampExtractor)"之前加上"keyBy(keySelector)",那么程序就可以工作了。有人能帮忙解释一下原因吗?

您受到Flink中一个已知错误的影响:Flink-3121:水印转发不适用于不产生任何数据的源。

问题是FlinkKafkaConsumer的运行数量(很可能是CPU内核的数量(比如4个))多于分区(1个)。卡夫卡的消费者中只有一个在发射水印,其他消费者都在空转。

窗口操作员没有意识到这一点,正在等待来自所有消费者的水印。这就是为什么窗户永远不会触发的原因。

相关内容

  • 没有找到相关文章

最新更新