如何使用ExecutorService和Futures处理TimeoutException



我有下面的示例代码,假设MyCallable("B")的执行时间超过1秒,而其他的执行速度超过1秒。因此,在调用Future.get()的循环中,它将抛出一个TimeoutException

public static void main(String[] args) {
    ExecutorService es = Executors.newFixedThreadPool(2);
    List<Future<String>> futures = new ArrayList<Future<String>>();
    futures.add(es.submit(new MyCallable("A")));
    futures.add(es.submit(new MyCallable("B")));
    futures.add(es.submit(new MyCallable("C")));
    futures.add(es.submit(new MyCallable("D")));
    futures.add(es.submit(new MyCallable("E")));
    try {
        for(Future<String> f  : futures) {
            try {
                System.out.println("result " + f.get(1, TimeUnit.SECONDS));
            }
            catch (TimeoutException e) {
                // how do I know which MyCallable() has timed out?
            } catch (ExecutionException e) {
                System.out.println(e.getMessage());
            }
        }
    }
    catch (InterruptedException e) {
        e.printStackTrace();
    }
    finally {
        es.shutdown();
    }
}

正如预期的那样,每个MyCallable()实例都会执行,但对于超时的实例,我想执行一些错误处理,这需要知道哪个Callable与哪个Future相关联。

有没有这种关联的机制,或者由我的Callable来处理它的call()方法中的所有错误处理?

似乎可以简单地维护Map<Future<String>, Callable<String>>而不是List<Future<String>>,并以这种方式检索原始Callable。

如果你想变得非常聪明,你可以采用OO风格,扩展ThreadPoolExecutor并创建一个Future decorator类。我认为这可能有些过头了,但你可以这样做:

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class FutureWithCallable<T> implements Future<T> {
    private final Callable<T> callable;
    private final Future<T> wrapped;
    public FutureWithCallable(Future<T> wrapped, Callable<T> callable) {
        this.callable = callable;
        this.wrapped = wrapped;
    }
    public Callable<T> getCallable() {
        return callable;
    }
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        return wrapped.cancel(mayInterruptIfRunning);
    }
    @Override
    public T get() throws InterruptedException, ExecutionException {
        return wrapped.get();
    }
    @Override
    public T get(long timeout, TimeUnit unit) throws InterruptedException,
            ExecutionException, TimeoutException {
        return wrapped.get(timeout, unit);
    }
    @Override
    public boolean isCancelled() {
        return wrapped.isCancelled();
    }
    @Override
    public boolean isDone() {
        return wrapped.isDone();
    }
}

然后:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
    public class ExecutorServiceWithCallable extends ThreadPoolExecutor {
        public ExecutorServiceWithCallable(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
        }
        @Override
        public <T> FutureWithCallable submit(Callable<T> callable) {
            Future<T> future = super.submit(callable);
            return new FutureWithCallable<T>(future, callable);
        }
    }

公共类TimeoutException扩展异常阻塞操作超时时引发异常。指定了超时的阻塞操作需要一种方法来指示超时已经发生。对于许多这样的操作,可以返回指示超时的值;当这不可能或不可取时,则应声明并抛出TimeoutException。

相关内容

  • 没有找到相关文章

最新更新