Future和ExecutorService,如何知道取消的任务何时终止



假设我有一些代码使用ExecutorService启动任务,然后调用程序通过submit()方法返回的Future取消它:

execService = Executors.newSingleThreadExecutor ();
Future<String> result = execService.submit ( () -> {
for ( ... ) {
if ( Thread.interrupted() ) break;
// Stuff that takes a while
}
return "result";
});
...
result.cancel ( true );
// I'm not sure the corresponding thread/task has finished and the call() method 
// above returned
// result.isDone() is immediately true. result.get() throws CancellationException

因此,我可以取消后台任务,让执行器准备重新开始。但是,在被中断的任务完成中断并返回相应的方法之前,我不能执行后者。

注意,在上面的代码中,主流在result.cancel()之后直接返回,而result.isCancelled()在那之后直接为true,同时,并行任务可能需要一段时间才能再次检查Thread.interrupted()并终止。在继续主线程之前,我需要确保副任务已经完全完成。

此外,请注意,使用单个线程执行器是偶然的,除了这个简单的例子之外,我想解决只有一个并行线程的情况和运行多个线程的情况下的问题。

确定这一点的最佳方法是什么?

到目前为止,我已经考虑过引入一个对任务和调用程序都可见的标志,或者关闭执行器并等待它完成。如果想多次重用执行器,后一种解决方案可能效率低下,前者更好,但我想知道是否还有其他更简单或更规范的方法。

我也遇到过类似的问题,我最终制作了这个实用程序:

private enum TaskState {
READY_TO_START,
RUNNING,
DONE,
MUST_NOT_START;
}

/**
* Runs the given set of tasks ensuring that:
* 
* If one fails this will not return while tasks are running and once returned
* no more tasks will start.
* 
* 
* @param <V>
* @param pool
* @param tasksToRun
*/
public <V> void runAndWait(ExecutorService pool, List<Callable<V>> tasksToRun) {

// We use this to work around the fact that the future doesn't tell us if the task is actually terminated or not.
List<AtomicReference<TaskState>> taskStates = new ArrayList<>();
List<Future<V>> futures = new ArrayList<>();

for(Callable<V> c : tasksToRun) {
AtomicReference<TaskState> state = new AtomicReference<>(TaskState.READY_TO_START);
futures.add(pool.submit(new Callable<V>() {
@Override
public V call() throws Exception {
if(state.compareAndSet(TaskState.READY_TO_START, TaskState.RUNNING)) {
try {
return c.call();
} finally {
state.set(TaskState.DONE);
}
} else {
throw new CancellationException();
}
}

}));
taskStates.add(state);
}
int i = 0;
try {

// Wait for all tasks.
for(; i < futures.size(); i++) {
futures.get(i).get(7, TimeUnit.DAYS);
}
} catch(Throwable t) { 
try {
// If we have tasks left that means something failed.
List<Throwable> exs = new ArrayList<>();
final int startOfRemaining = i;

// Try to stop all running tasks if any left.
for(i = startOfRemaining; i < futures.size(); i++) {
try {
futures.get(i).cancel(true);
} catch (Throwable e) {
// Prevent exceptions from stopping us from
// canceling all tasks. Consider logging this.
}
}

// Stop the case that the task is started, but has not reached our compare and set statement. 
taskStates.forEach(s -> s.compareAndSet(TaskState.READY_TO_START, TaskState.MUST_NOT_START));

for(i = startOfRemaining; i < futures.size(); i++) {
try {
futures.get(i).get();
} catch (InterruptedException e) {
break; // I guess move on, does this make sense should we instead wait for our tasks to finish?
} catch (CancellationException e) {
// It was cancelled, it may still be running if it is we must wait for it.
while(taskStates.get(i).get() == TaskState.RUNNING) {
Thread.sleep(1);
}
} catch (Throwable t1) {
// Record the exception if it is interesting, so users can see why it failed.
exs.add(t1);
}
}

exs.forEach(t::addSuppressed);
} finally {
throw new RuntimeException(t);
}
}
}

最后,我把每一项任务都包装起来,这样我就可以知道是否所有任务都完成了。

根据Future.cancel(boolean)文档:

尝试取消执行此任务。如果任务已完成、已取消或由于其他原因无法取消,则此尝试将失败。如果成功,并且在调用cancel时此任务尚未启动,则不应运行此任务。如果任务已经启动,那么mayInterruptIfRunning参数确定是否应该中断执行此任务的线程以尝试停止任务。

此方法返回后,对isDone()的后续调用将始终返回true。如果此方法返回true,则对isCancelled()的后续调用将始终返回true。

根据get()文档:

Vget()抛出InterruptedException,ExecutionException如果需要,等待计算完成,然后检索其结果。

假设我们已经执行了一项任务,但出于充分的理由,我们不再关心结果了。我们可以使用Future.cancel(boolean)来告诉执行器停止操作并中断其底层线程:

future.cancel(true);

上面代码中的Future实例永远不会完成其操作。

事实上,如果我们尝试从该实例调用get(),在调用cancel()之后,结果将是CancellationException

Future.isCancelled()会告诉我们未来是否已经取消。这对于避免出现CancellationException非常有用。

对cancel()的调用可能失败。在这种情况下,其返回值将为false。请注意,cancel()将布尔值作为参数,它控制执行此任务的线程是否应该中断。

提交所有需要执行的任务。由于您创建了一个单线程执行器,因此在任何时候都不会有超过一个任务在运行。它们将在之后执行,其中一个将仅在前一个结束(完成或中断)后启动。你可以把它想象成一个队列。

您的代码可能如下所示:

Future<String> result1 = execService.submit(task-1);
Future<String> result2 = execService.submit(task-2);
Future<String> result3 = execService.submit(task-3);

更新1

你问:在继续主线程之前,我需要确保副任务完全完成常逻辑或处理中断请求——线程都很忙,所有其他任务都在等待它。

对于您的问题,我对一般情况感兴趣:告诉我们您如何定义一般。你的意思是线程的数量可以超过1吗?你的意思是,任何任务都可以依赖任意其他任务*吗?你的意思是,应该启动什么任务的逻辑取决于其他任务的执行结果吗?等等

Java中的执行程序的目的是封装与创建和监视线程相关的开销。使用执行器只能实现非常简单的工作流。如果你想实现一些复杂的逻辑,那么执行器不是最好的选择。

相反,可以考虑使用工作流引擎,如Activity、Camunda或Bonita。然后你将能够同时执行一些任务,你将能够指定在任务X开始之前,它应该等待,直到同时运行的任务A、B和C全部完成,等等

更新2

在你的代码中你有

if (Thread.interrupted()) ...

但这会检查main线程,而不是执行器执行任务的线程。

如果你想知道任务是如何执行的——它是否完成了整个工作,是否有任何异常(如NullPOinterException),或者它是否被中断——那么你应该使用其他方法,如以下方法:

Future<String> future1 = execService.submit(task1);
try {
String result1 = future1.get();
if (future1.isCancelled()) {
// Your reaction on cancellation
...
} else {
// If we are here, it means task has completed its whole work:
// - Task was not cancelled
// - Task thread was not interrupted
// - There was no unhandled exception
...
}
} catch (InterruptedException ie) {
// Your reaction on interruption.
...
} catch (ExecutionException ee) {
// Your reaction on unhandled exception in the task
...
}

如果在每个任务之后都有这样的逻辑,那么代码可能会非常复杂。在你实现它一个月后,可能很难理解这样的代码:)

你真的有如此复杂的逻辑,以至于一个任务应该分析前一个任务的执行结果吗?是否要根据上一个任务的执行结果来决定下一个启动的任务?

如果你唯一想确定的是前一个任务已经完成了它的所有活动——它的正常工作、异常处理、对线程中断的反应等等——那么在单线程执行器中也有这个。因为遗嘱执行人做这一切都是为了你们。

可能是你所期望的执行人的工作方式不是100%正确的。只需做一个包含中断的单线程执行器的小原型,并告诉我们为什么它不适合您的情况。

最新更新