当其中一个下游挂起时,如何保持来自非并行源的广播?



下面的程序打印"1"几秒钟,然后挂起。s1s2的并行度为 4。最终,我的问题是如何使以下代码无限期地打印"1"而不更改运算符和源代码中的代码?

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStreamSource<Integer> numbers = env.addSource(new SourceFunction<Integer>() {
@Override
public synchronized void run(SourceContext<Integer> sourceContext) {
while (true) sourceContext.collect(0);
}
@Override
public void cancel() { }
});
DataStream<Integer> s1 = numbers.map(d -> d + 1);
DataStream<Integer> s2 = numbers.map(d -> {
while (true) Thread.yield();
});
s1.print();
s2.print();
env.execute();
}

也许我弄错了整个 Flink 故事,但我看不到一种正统的方法,可以从固有的非并行源到多个独立的并行管道进行多路复用,而不会阻塞最慢的管道。根据源代码,有一个普通的 for 循环广播(org.apache.flink.streaming.runtime.tasks.OperatorChain:630,v 1.5.0(,在背压过高的情况下,它会挂起执行SourceContext.collect的线程在监视器等待中,显然是由于下游运算符的缓冲区已满。

如果两个下游运算符真正独立,那么我建议为两个运算符实例化它们自己的源。

您的应用程序的问题在于,两个下游运算符通过 Flink 的处理保证隐式耦合。有了恰好一次和至少一次的处理保证,如果其中一个操作员挂起,源不能简单地继续向下游推送记录。它必须确保所有消费者至少看到一次所有元素,因此,如果其中一个消费者挂起,它也需要背压。

相关内容

  • 没有找到相关文章

最新更新