FLINK:窗口不会在流的末尾处理数据



我有一个flink kafka消费者的流(kafka msgs流到主题上),我注意到我想解决的有趣行为。

当数据流中流中,如果在窗口"完整"之前停止,或者数据结束(几个窗口之后),并且没有到达窗口的末端,则管道的其余部分不会触发。

示例流:

    env.addSource(kafkaConsumer)
       .flatMap(new TokenMapper())
       .keyBy("word")
       .window(TumblingEventTimeWindows.of(Time.seconds(10L)))
       .reduce(new CountTokens())
       .flatMap(new ConvertToString())
       .addSink(producer);

我正在使用flinkkafkaconsumer010将设置为Eventhactime的Env Timecharactimation。和消费者。

private static class PeriodicWatermarks implements   AssignerWithPeriodicWatermarks<String>{
    private long currentMaxTimestamp;
    private final long maxOutOfOrderness;
    public PeriodicWatermarksAuto(long maxOutOfOrderness){
        this.maxOutOfOrderness = maxOutOfOrderness;
    }
    @Override
    public Watermark getCurrentWatermark() {
         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
    @Override
    public long extractTimestamp(String t, long l) {
        // this should be the event timestamp
        currentMaxTimestamp = l;
        logger.info("TIMESTAMP: " + l);
        return l;
    }
}

如果我的窗口说10秒,而我的数据流仅包含8秒的数据(然后在一段时间内停止流式传输),则flatmap-> sink才能处理,直到将新的后期数据流入其中。

示例数据流处理问题:(每个x都是每秒数据)

      xxxxxxxx(8secs)------(gap)--(later more data)xxxxx
      ^(not processed)           (until I get here)^

类似地,例如,如果我有价值35秒的流数据(我的窗口是10秒),只有3个窗口的数据触发器,其余5秒的数据从未处理。

     ...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx
         (processed)        ^(not processed)          (until I get here)^

最后,如果我的窗口为10秒,而i 仅具有5秒的流数据,则flatmap->接收器永远不会发生。

我的问题是,如果我们一段时间后我们看不到数据,是否有办法触发窗口数据进行处理?

如果我的数据是现场直播的,我可以看到有无数据的伸展新数据进来了,我希望窗口时间通过后最后一个窗口的结果。

大声思考,这似乎是由于使用事件时间而不是处理时间,或者,我的水印未适当地生成最后一个窗口来实际触发的...不确定这两者都不确定吗?我认为这对任何人来说都是一个问题,如果您的流程结束时,最后位不会触发。我会说我可能会发送流程度的味精,但是如果蒸汽结束,这无济于事,因为源会破裂。

编辑:所以我更改为处理时间,并且确实正确地处理了最后一个窗口中的数据,所以我想EventTime毕竟是罪魁祸首,我认为我认为自定义触发器或正确的窗口水印可能是答案...

感谢您的帮助!

我将其留给后代,因为我想的是与水印有关的问题。Timestamper和Watermaker(来自分配的Timestampsandwatermarks)称呼为" GetCurrentWaterMark()",并且由于我将基于传入实体的水印设置为固定数字(他们的时间戳 - 最大偏移量 - 最大偏移量),直到看到一个新的授权。

我的解决方案是某种计时器,如果没有在可配置的时间内看到数据,则最终将水印推向了下一个窗口。我将无法处理非常潜在的数据,但我不希望这应该是一个问题。这是事件时间处理的预期行为。

相关内容

  • 没有找到相关文章

最新更新