基于事件时间的窗口不触发



我正在研究基于Flink事件时间的窗口。但是当我发送kafka消息程序不做窗口操作时。我做了文档所说的一切,但无法解决问题,任何帮助都将得到赞赏,提前感谢

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
        environment.getConfig();
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        environment.setParallelism(1);
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "localhost:9092");
        props.setProperty("group.id","event-group");
        FlinkKafkaConsumer<EventSalesQuantity> consumer = new FlinkKafkaConsumer<EventSalesQuantity>("EventTopic",new EventSerializationSchema(),props);
        DataStream<EventSalesQuantity> eventDataStream = environment.addSource(consumer);
        KeyedStream<EventSalesQuantity, String> keyedEventStream = eventDataStream.assignTimestampsAndWatermarks( new AssignerWithPeriodicWatermarksImpl()).
           keyBy(new KeySelector<EventSalesQuantity, String>() {
               @Override
               public String getKey(EventSalesQuantity eventSalesQuantity) throws Exception {
                   return  eventSalesQuantity.getDealer();
               }
           });
        DataStream<Tuple2<EventSalesQuantity,Integer>> eventSinkStream = keyedEventStream.timeWindow(Time.seconds(5)).aggregate(new AggregateImpl());
        eventSinkStream.addSink(new FlinkKafkaProducer<Tuple2<EventSalesQuantity, Integer>>("localhost:9092","SinkEventTopic",new EventSinkSerializationSchema()));
        eventSinkStream.print();
        environment.execute();
    }
}


public class AssignerWithPeriodicWatermarksImpl implements AssignerWithPeriodicWatermarks<EventSalesQuantity> {
    private final long maxOutOfOrderness = 3500; 
    private long currentMaxTimestamp;
    @Override
    public long extractTimestamp(EventSalesQuantity element, long previousElementTimestamp) {
        long timestamp = DateUtils.getDateFromString(element.getTransactionDate()).getTime();
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
        return timestamp;
    }
    @Override
    public Watermark getCurrentWatermark() {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }

"2019-06-21T09:43:01"2019-06-21T09:43:03">

我发送了 2 条带有这些时间戳的消息,但我没有得到输出。

  1. 您的事件时间窗口长度为 5 秒。包含这些事件的窗口在看到时间戳至少为 2019-06-21T09:43:05 的水印之前不会触发。
  2. 将 maxOutOfOrderness 设置为 3500 毫秒时,水印生成器不会生成足够大的水印来触发窗口,直到它看到时间戳至少为 2019-06-21T09:43:08.500 的事件。

相关内容

  • 没有找到相关文章

最新更新