OrderedStreamelementqueue-潜在的僵局



似乎我在以下模式下使用asyncfunc时陷入僵局。

我已经能够复制此错误如下:

import org.apache.flink.streaming.api.datastream.AsyncDataStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.AsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
import org.junit.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class AsyncTest {
    @Test
    public void test() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<Integer> source = env.fromCollection(Arrays.asList(1,2,3,4,5,6,7));
        AsyncDataStream.orderedWait(source,
                new AsyncFunction<Integer, String>(){
                    @Override
                    public void asyncInvoke(Integer integer, AsyncCollector<String> asyncCollector) throws Exception {
                           AsyncTest.getFuture(integer).whenComplete((t,m) -> {
                               if (m==null){
                                   asyncCollector.collect(Collections.singleton(t));
                                   return;
                               }
                               asyncCollector.collect(m);
                           });
                    }
                }, 20000, TimeUnit.MILLISECONDS,5)
                .returns(String.class)
                .print();
        env.execute("unit-test");
    }
    static CompletableFuture<String> getFuture(Integer input) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
            }
            if (input == 7){
                System.out.println("Returning");
                return "ok";
            }
            System.out.println("Waking up");
            throw new RuntimeException("test");
        },new ScheduledThreadPoolExecutor(10));
    }
}

我认为,这个僵局可能来自以下事实:的基础 arrayqueue > orderdstreamelementqueue 首先填补了不完整的期货。

发射器试图从该队列中窥视,因为没有未来完成,发射器会暂停其执行。

期货完成后,它们会异常完成,因此不会触发 oncompleteHandler 方法,该方法应该在 headisCompleted 上称为SignalAll (因此,唤醒了Emitter线程(

同时,由于队列已满, tryput droundStreamelement 返回false的任何呼叫都可以是可以是链条链接 oncompleteHandler

所以,似乎无法唤醒发射器线程。

这只是我的猜测。也许我弄错了。但是,当我本地运行上面的代码时,执行永远不会结束。

除了增加队列的能力和或工作的能力外,是否有任何方法可以克服这个问题?

您的分析是正确的。问题在于StreamElementQueueEntry没有对其未来的出色完成做出反应。因此,每当发生超时或其他例外时,StreamElementQueueEntry都无法正确设置为完成,并且Emitter未通知有关新完成的条目。

问题已与Flink-6435解决了,它已经合并为Flink Master。

相关内容

  • 没有找到相关文章

最新更新