下面的程序打印"1"几秒钟,然后挂起。s1
和s2
的并行度为 4。最终,我的问题是如何使以下代码无限期地打印"1"而不更改运算符和源代码中的代码?
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
DataStreamSource<Integer> numbers = env.addSource(new SourceFunction<Integer>() {
@Override
public synchronized void run(SourceContext<Integer> sourceContext) {
while (true) sourceContext.collect(0);
}
@Override
public void cancel() { }
});
DataStream<Integer> s1 = numbers.map(d -> d + 1);
DataStream<Integer> s2 = numbers.map(d -> {
while (true) Thread.yield();
});
s1.print();
s2.print();
env.execute();
}
也许我弄错了整个 Flink 故事,但我看不到一种正统的方法,可以从固有的非并行源到多个独立的并行管道进行多路复用,而不会阻塞最慢的管道。根据源代码,有一个普通的 for 循环广播(org.apache.flink.streaming.runtime.tasks.OperatorChain:630
,v 1.5.0(,在背压过高的情况下,它会挂起执行SourceContext.collect
的线程在监视器等待中,显然是由于下游运算符的缓冲区已满。
如果两个下游运算符真正独立,那么我建议为两个运算符实例化它们自己的源。
您的应用程序的问题在于,两个下游运算符通过 Flink 的处理保证隐式耦合。有了恰好一次和至少一次的处理保证,如果其中一个操作员挂起,源不能简单地继续向下游推送记录。它必须确保所有消费者至少看到一次所有元素,因此,如果其中一个消费者挂起,它也需要背压。