我有下面的示例代码,假设MyCallable("B")
的执行时间超过1秒,而其他的执行速度超过1秒。因此,在调用Future.get()
的循环中,它将抛出一个TimeoutException
。
public static void main(String[] args) {
ExecutorService es = Executors.newFixedThreadPool(2);
List<Future<String>> futures = new ArrayList<Future<String>>();
futures.add(es.submit(new MyCallable("A")));
futures.add(es.submit(new MyCallable("B")));
futures.add(es.submit(new MyCallable("C")));
futures.add(es.submit(new MyCallable("D")));
futures.add(es.submit(new MyCallable("E")));
try {
for(Future<String> f : futures) {
try {
System.out.println("result " + f.get(1, TimeUnit.SECONDS));
}
catch (TimeoutException e) {
// how do I know which MyCallable() has timed out?
} catch (ExecutionException e) {
System.out.println(e.getMessage());
}
}
}
catch (InterruptedException e) {
e.printStackTrace();
}
finally {
es.shutdown();
}
}
正如预期的那样,每个MyCallable()实例都会执行,但对于超时的实例,我想执行一些错误处理,这需要知道哪个Callable
与哪个Future
相关联。
有没有这种关联的机制,或者由我的Callable
来处理它的call()
方法中的所有错误处理?
似乎可以简单地维护Map<Future<String>, Callable<String>>
而不是List<Future<String>>
,并以这种方式检索原始Callable。
如果你想变得非常聪明,你可以采用OO风格,扩展ThreadPoolExecutor并创建一个Future decorator类。我认为这可能有些过头了,但你可以这样做:
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureWithCallable<T> implements Future<T> {
private final Callable<T> callable;
private final Future<T> wrapped;
public FutureWithCallable(Future<T> wrapped, Callable<T> callable) {
this.callable = callable;
this.wrapped = wrapped;
}
public Callable<T> getCallable() {
return callable;
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
return wrapped.cancel(mayInterruptIfRunning);
}
@Override
public T get() throws InterruptedException, ExecutionException {
return wrapped.get();
}
@Override
public T get(long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
return wrapped.get(timeout, unit);
}
@Override
public boolean isCancelled() {
return wrapped.isCancelled();
}
@Override
public boolean isDone() {
return wrapped.isDone();
}
}
然后:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceWithCallable extends ThreadPoolExecutor {
public ExecutorServiceWithCallable(int corePoolSize, int maxPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maxPoolSize, keepAliveTime, unit, workQueue);
}
@Override
public <T> FutureWithCallable submit(Callable<T> callable) {
Future<T> future = super.submit(callable);
return new FutureWithCallable<T>(future, callable);
}
}
公共类TimeoutException扩展异常阻塞操作超时时引发异常。指定了超时的阻塞操作需要一种方法来指示超时已经发生。对于许多这样的操作,可以返回指示超时的值;当这不可能或不可取时,则应声明并抛出TimeoutException。