我有一个flink kafka消费者的流(kafka msgs流到主题上),我注意到我想解决的有趣行为。
当数据流中流中,如果在窗口"完整"之前停止,或者数据结束(几个窗口之后),并且没有到达窗口的末端,则管道的其余部分不会触发。
示例流:
env.addSource(kafkaConsumer)
.flatMap(new TokenMapper())
.keyBy("word")
.window(TumblingEventTimeWindows.of(Time.seconds(10L)))
.reduce(new CountTokens())
.flatMap(new ConvertToString())
.addSink(producer);
我正在使用flinkkafkaconsumer010将设置为Eventhactime的Env Timecharactimation。和消费者。
private static class PeriodicWatermarks implements AssignerWithPeriodicWatermarks<String>{
private long currentMaxTimestamp;
private final long maxOutOfOrderness;
public PeriodicWatermarksAuto(long maxOutOfOrderness){
this.maxOutOfOrderness = maxOutOfOrderness;
}
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(String t, long l) {
// this should be the event timestamp
currentMaxTimestamp = l;
logger.info("TIMESTAMP: " + l);
return l;
}
}
如果我的窗口说10秒,而我的数据流仅包含8秒的数据(然后在一段时间内停止流式传输),则flatmap-> sink才能处理,直到将新的后期数据流入其中。
示例数据流处理问题:(每个x都是每秒数据)
xxxxxxxx(8secs)------(gap)--(later more data)xxxxx
^(not processed) (until I get here)^
类似地,例如,如果我有价值35秒的流数据(我的窗口是10秒),只有3个窗口的数据触发器,其余5秒的数据从未处理。
...xxxxxxxxxx(10secs)xxxxx(5secods)------(gap)--(later more data)xxxxx
(processed) ^(not processed) (until I get here)^
最后,如果我的窗口为10秒,而i 仅具有5秒的流数据,则flatmap->接收器永远不会发生。
我的问题是,如果我们一段时间后我们看不到数据,是否有办法触发窗口数据进行处理?
如果我的数据是现场直播的,我可以看到有无数据的伸展新数据进来了,我希望窗口时间通过后最后一个窗口的结果。
大声思考,这似乎是由于使用事件时间而不是处理时间,或者,我的水印未适当地生成最后一个窗口来实际触发的...不确定这两者都不确定吗?我认为这对任何人来说都是一个问题,如果您的流程结束时,最后位不会触发。我会说我可能会发送流程度的味精,但是如果蒸汽结束,这无济于事,因为源会破裂。
编辑:所以我更改为处理时间,并且确实正确地处理了最后一个窗口中的数据,所以我想EventTime毕竟是罪魁祸首,我认为我认为自定义触发器或正确的窗口水印可能是答案...
感谢您的帮助!
我将其留给后代,因为我想的是与水印有关的问题。Timestamper和Watermaker(来自分配的Timestampsandwatermarks)称呼为" GetCurrentWaterMark()",并且由于我将基于传入实体的水印设置为固定数字(他们的时间戳 - 最大偏移量 - 最大偏移量),直到看到一个新的授权。
我的解决方案是某种计时器,如果没有在可配置的时间内看到数据,则最终将水印推向了下一个窗口。我将无法处理非常潜在的数据,但我不希望这应该是一个问题。这是事件时间处理的预期行为。