似乎我在以下模式下使用asyncfunc时陷入僵局。
我已经能够复制此错误如下:
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AsyncTest {
@Test
public void test() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Integer> source = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7));
AsyncDataStream.orderedWait(source,
new AsyncFunction<Integer, String>(){
@Override
public void asyncInvoke(Integer integer, AsyncCollector<String> asyncCollector) throws Exception {
AsyncTest.getFuture(integer).whenComplete((t,m) -> {
if (m==null){
asyncCollector.collect(Collections.singleton(t));
return;
}
asyncCollector.collect(m);
});
}
}, 20000, TimeUnit.MILLISECONDS,5)
.returns(String.class)
.print();
env.execute("unit-test");
}
static CompletableFuture<String> getFuture(Integer input) {
return CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
}
if (input == 7){
System.out.println("Returning");
return "ok";
}
System.out.println("Waking up");
throw new RuntimeException("test");
},new ScheduledThreadPoolExecutor(10));
}
}
我认为,这个僵局可能来自以下事实:的基础 arrayqueue > orderdstreamelementqueue 首先填补了不完整的期货。
当发射器试图从该队列中窥视,因为没有未来完成,发射器会暂停其执行。
期货完成后,它们会异常完成,因此不会触发 oncompleteHandler 方法,该方法应该在 headisCompleted 上称为SignalAll (因此,唤醒了Emitter线程(
同时,由于队列已满, tryput droundStreamelement 返回false的任何呼叫都可以是可以是链条链接 oncompleteHandler 。
所以,似乎无法唤醒发射器线程。
这只是我的猜测。也许我弄错了。但是,当我本地运行上面的代码时,执行永远不会结束。
除了增加队列的能力和或工作的能力外,是否有任何方法可以克服这个问题?
您的分析是正确的。问题在于StreamElementQueueEntry
没有对其未来的出色完成做出反应。因此,每当发生超时或其他例外时,StreamElementQueueEntry
都无法正确设置为完成,并且Emitter
未通知有关新完成的条目。
问题已与Flink-6435解决了,它已经合并为Flink Master。