Flink 如何处理迭代循环中的时间戳



在 Flink 的迭代 DataStream 循环中如何处理时间戳?

例如,下面是 Flink 中一个简单的迭代循环示例,其中反馈循环的类型与输入流不同:

DataStream<MyInput> inputStream = env.addSource(new MyInputSourceFunction());
IterativeStream.ConnectedIterativeStreams<MyInput, MyFeedback> iterativeStream = inputStream.iterate().withFeedbackType(MyFeedback.class);
// define an output tag so we can emit feedback objects via a side output
final OutputTag<MyFeedback> outputTag = new OutputTag<MyFeedback>("feedback-output"){};
// now do some processing
SingleOutputStreamOperator<MyOutput> combinedStreams = iterativeStream.process(new CoProcessFunction<MyInput, MyFeedback, MyOutput>() {
    @Override
    public void processElement1(MyInput value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some processing of the stream of MyInput values
        // emit MyOutput values downstream by calling out.collect()
        out.collect(someInstanceOfMyOutput);
    }
    @Override
    public void processElement2(MyFeedback value, Context ctx, Collector<MyOutput> out) throws Exception {
        // do some more processing on the feedback classes
        // emit feedback items
        ctx.output(outputTag, someInstanceOfMyFeedback);
    }
});
iterativeStream.closeWith(combinedStreams.getSideOutput(outputTag));

我的问题围绕着 Flink 如何在反馈循环中使用时间戳:

  • ConnectedIterativeStreams中,Flink 如何处理输入对象在常规输入和反馈对象流中的排序?如果我将一个对象发射到反馈循环中,相对于常规的输入对象流,循环的头部何时会看到它?
  • 使用事件时间处理时行为如何变化?

> AFAICT,Flink 不对输入对象的顺序提供任何保证。我在尝试在 Flink 中对聚类算法使用迭代时遇到了这个问题,其中质心更新没有得到及时处理。我发现的唯一解决方案基本上是创建传入事件和质心更新的单个(联合(流,而不是使用共同流。

仅供参考,这个提议是为了解决迭代的一些缺点。

最新更新