Flink Timer不按时执行



这是一个后续问题:触发状态当时

我正在将每个传入元素的状态存储在流中,并且在计时器关闭后,我删除了状态。这样我就可以防止对重复处理进行处理,直到元素计时,之后我可以再次处理相同的元素。我

我已经为测试计时器编写了以下代码,但是在所有3个元素通过第一个ProcessFunction之后,计时器似乎是触发的。

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    streamEnv.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    streamEnv.setParallelism(12);
    List<Tuple2<String, String>> inputList = new ArrayList<>();
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));
    inputList.add(new Tuple2<>("Test", "test"));
    streamEnv.fromCollection(inputList).keyBy(0)
            .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                ValueState<Integer> occur;
                @Override
                public void open(Configuration parameters) throws Exception {
                    occur = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("occurs", Integer.class, 0));
                }
                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    if (occur.value() < 2) {
                        occur.update(occur.value() + 1);
                        out.collect(value);
                        LOGGER.info("[TEST] Outputting Tuple {}", value);
                    }
                    else {
                        Thread.sleep(10000);
                        LOGGER.info("[TEST] Outputting Tuple {}", value);
                        out.collect(value);
                    }
                }
            })
            .keyBy(0)
            .process(new ProcessFunction<Tuple2<String, String>, Tuple2<String, String>>() {
                ValueState<Tuple2<String, String>> storedTuple;
                @Override
                public void open(Configuration parameters) throws Exception {
                    storedTuple = getRuntimeContext().getState(new ValueStateDescriptor<>("storedTuple",
                            TypeInformation.of(new TypeHint<Tuple2<String, String>>() {})));
                }
                @Override
                public void processElement(Tuple2<String, String> value, Context ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    Tuple2<String, String> stored = storedTuple.value();
                    if (stored == null) {
                        LOGGER.info("[TEST] Storing Tuple {}", value);
                        storedTuple.update(value);
                        out.collect(value);
                        ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 6000);
                    }
                }
            }
            @Override
                public void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<String, String>> out) throws Exception {
                    LOGGER.info("[TEST] Removing Tuple {}", storedTuple.value());
                    storedTuple.clear();
                }    
            )
            .addSink(new CollectSink());
    streamEnv.execute("Testing");
    for (Tuple2<String, String> tup: CollectSink.values) {
        System.out.println(tup);
    }
}
private static class CollectSink implements SinkFunction<Tuple2<String, String>> {
    static final List<Tuple2<String, String>> values = new ArrayList<>();
    @Override
    public synchronized void invoke(Tuple2<String, String> value) throws Exception {
        values.add(value);
    }
}

我有一个带有3个重复元素的输入列表。在第一个ProcessFunction中,我将前两个元素发送给第三个元素,但将第三元素延迟10秒。

在第二个ProcessFunction中,它根据状态是否存储为其过滤元素。正如预期的那样,第一个元素被存储并继续发送,第二个元素并不是国家已经存在的。对于第一个元素,除了发送它之外,我还将计时器设置为6秒,以便在计时器触发后清除状态。

现在,第三个元素是在10秒后发送的,这意味着6秒触发器应该已经清除了状态。但是,在触发计时器之前,也正在处理第三个元素。我还可以将输出视为仅包含1个元组副本,即使我期望2份副本。

我添加了一些日志记录以更好地了解执行时间。

[2019-02-19 14:11:48,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:48,943] [Process -> Sink: Unnamed (1/12)] INFO  FlinkTest - [TEST] Storing Tuple (Test,test)
[2019-02-19 14:11:58,891] [Process (1/12)] INFO  FlinkTest - [TEST] Outputting Tuple (Test,test)
[2019-02-19 14:11:58,896] [Process -> Sink: Unnamed (1/12)] INFO  FlinkTest - [TEST] Removing Tuple (Test,test)

您可以看到前两个元组按预期一起排放,然后延迟10秒,然后发出了第三个元组。现在Removing Tuple发生在10秒之后,即使触发了第一个元组的6秒后发生的。

事件时间计时器直到处理计时器中指定的时间大的水印才会发射。直到第三次事件处理后才能发生这样的水印。此外,随着摄入时间,使用周期性水印发生器生成水印,默认情况下每200毫秒插入流中。

注意:在Flink 1.4.0之前,从处理时间计时器调用时,ProcessFunction.ontimer()方法将当前的处理时间设置为事件时间时间戳。这种行为非常微妙,用户可能不会注意到。好吧,这是有害的,因为处理时间时间戳是不确定的,并且与水印不符。此外,用户实施的逻辑取决于这个错误的时间戳,这是很可能是明显的错误。因此,我们决定解决它。升级到1.4.0后,使用此不正确的事件时间戳记的Flink作业将失败,用户应将其工作调整为正确的逻辑。

https://ci.apache.org/projects/flink/flink/flink-docs-stable/dev/stream/process_function.html

相关内容

  • 没有找到相关文章

最新更新