我在这里的代码中看到很多
executorService.submit(() -> {
// do stuff
}).get();
…我想知道为什么这样使用executorService,提交一些你会立即得到的东西?
调用直接get()
强制ExecutorService
处理"做某事";在执行器服务的线程池中,而不是在本地线程中,例如,如果分配给它的最大线程数很小,尽管有大量线程正在运行以处理请求,但可能会阻塞处理。
ExecutorService executorService = Executors.newSingleThreadExecutor();
和从一个HttpRequestHandler
调用的示例代码。
通过使用直接get()
处理"做东西";将序列化,这可能是理想的或需求,尽管在各自的线程中同时处理许多请求。
没有executorService.submit(...).get()
的包装,处理"做东西";将在请求处理程序的线程中并行完成,这可能是好的,也可能会导致问题,如果"做事情";如果并行性是无界的,则在某种程度上特别昂贵或受限。
并行性可以限制/限定而不是消除,如下所示:
ExecutorService executorService = Executors.newFixedThreadPool(3);
将限制处理(例如)最多3个并发处理"做事情",尽管有大量请求正在同时处理。
其他更微妙的效果可以通过选择"做事情"的Thread
来实现。运行。考虑:
ExecutorService executorService = Executors.newSingleThreadExecutor(myThreadFactory);
将使"做某事"在自定义类型的线程中运行,而不是在为HttpRequestHandler
选择的线程类型中运行。
提交东西你会立即得到
仅仅因为你提交任务并立即"得到"它,这并不一定意味着它会立即由ExecutorService运行。
下面是一个实用的例子:
你有一个web服务器,它返回天气数据。当一个HTTP请求进来时,您需要连接到一个天气API来检索天气数据。
然而,天气API只允许两个并发连接。
解决这个问题的一种方法是只有两个可用线程的ExecutorService。
现在,无论有多少servlet线程同时提交任务以供执行,您都可以确保一次只有两个线程可以针对天气API执行请求。servlet线程将阻塞,直到天气API可用来为servlet提供所请求的数据。
因为有人盲目地从Javadoc中复制了这句话,上面写着"如果您想立即阻塞等待任务,您可以使用result = exec.submit(aCallable).get();
">
形式的结构除了@Michael在他对这个问题的评论中提到的异常语义的细微差别之外,wrt也有细微的差别。中断线程:如果// do stuff
不间断地阻塞,您仍然可以中断对Future.get
的阻塞调用或取消未来。不过请注意,这对运行// do stuff
的线程没有任何影响。这个线程会一直阻塞,只有主线程才会被解除阻塞。
我能想到的唯一用例是:
- 调用它的代码必须同步返回一些结果(即它实现了一些同步api处理http请求或什么的)
- 被调用的执行器是一个ForkJoinPool,提交的任务是将在内部fork的RecursiveTask。这样你就可以使用多个cpu来执行整个任务。
在Future
上使用.get()
调用允许人们抽象出如何准确交付结果的细节。考虑到示例的长度,如果查看下面类的.get()
方法,就会发现在调用线程中实现相同类型的定时机制需要大量的样板代码。通过抽象它,调用线程只会无限期地阻塞,而工作线程则担心交付其Future
import java.util.Optional;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;
class DecayingRetry<T> implements Future<T> {
protected final ScheduledExecutorService executor;
protected final Callable<T> callable;
private final boolean isOwnExecutor;
protected ScheduledFuture<?> future;
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
protected Optional<?> result;
protected boolean isDone;
protected boolean isCancelled;
protected final ReentrantLock lock;
private final double maxDelay;
private double initialDelay;
private final TimeUnit timeUnit;
private DecayingRetry(ScheduledExecutorService executor,
boolean isOwnExecutor,
Callable<T> callable,
long initialDelay,
long maxDelay,
TimeUnit timeUnit) {
this.isOwnExecutor = isOwnExecutor;
lock = new ReentrantLock(true);
lock.lock();
this.executor = executor;
this.callable = callable;
isCancelled = false;
isDone = false;
if (maxDelay < 0) {
this.maxDelay = Double.POSITIVE_INFINITY;
} else {
this.maxDelay = maxDelay;
}
lock.lock();
this.initialDelay = (double) initialDelay;
this.timeUnit = timeUnit;
future = executor.schedule(this::delayLoop, initialDelay, timeUnit);
}
public static <T> T on(Callable<T> callable, long initialDelay, long maxDelay, TimeUnit timeUnit) throws Exception {
try {
return Optional.ofNullable(callable.call()).orElseThrow(IllegalStateException::new);
} catch (IllegalStateException ignored) {
try {
return new DecayingRetry<>(Executors.newSingleThreadScheduledExecutor(),
true,
callable,
initialDelay,
maxDelay,
timeUnit).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
System.exit(-1);
}
}
return null;
}
public static <T> T on(Callable<T> callable,
ScheduledExecutorService executor,
long initialDelay,
long maxDelay,
TimeUnit timeUnit) throws Exception {
try {
return Optional.ofNullable(callable.call()).orElseThrow(IllegalStateException::new);
} catch (IllegalStateException ignored) {
try {
return new DecayingRetry<>(executor,
false,
callable,
initialDelay,
maxDelay,
timeUnit).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
System.exit(-1);
}
}
return null;
}
synchronized private void delayLoop() {
if (isDone) {
return;
}
try {
result = Optional.ofNullable(callable.call());
} catch (Exception e) {
result = Optional.of(e);
isDone = true;
return;
}
if (!result.isPresent()) {
if (initialDelay < maxDelay) {
initialDelay *= 1.618033988749; //PHI
initialDelay = Math.min(maxDelay, initialDelay);
}
future = executor.schedule(this::delayLoop, (long) initialDelay, timeUnit);
} else {
isDone = true;
lock.unlock();
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (isDone) {
return false;
} else if (future.cancel(mayInterruptIfRunning)) {
isCancelled = true;
isDone = true;
return true;
}
return false;
}
@Override
public boolean isCancelled() {
return isCancelled;
}
@Override
public boolean isDone() {
return isDone;
}
@Override
@NotNull
public T get() throws InterruptedException, ExecutionException {
lock.lock();
while (!isDone) { // lock acquired too early for some reason, so we allow the worker thread to grab it
lock.unlock();
lock.lock();
}
if (result.isPresent()) {
if (result.get() instanceof Throwable) {
throw new ExecutionException((Throwable) result.get());
}
if (isOwnExecutor) {
executor.shutdown();
}
//noinspection unchecked
return (T) result.get();
}
throw new ExecutionException(new IllegalStateException("Retry result was null"));
}
public T get(long timeout, @NotNull TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
throw new ExecutionException(new IllegalStateException("Not implemented"));
}
}