如果任何线程发生异常,则中断所有线程



我有一个方法名称someTask,我必须调用 100 次,我正在使用异步编码,如下所示。

for (int i = 0; i < 100; i++) { 
futures.add(CompletableFuture.supplyAsync(  
() -> { someTask(); }, 
myexecutor
));
}

如果在执行someTask()时任何线程发生异常,我想中断所有当前线程并阻止未来的线程执行。处理此问题的最佳方法是什么?

更新: 我正在使用ThreadPoolTaskExecutor.

由于您似乎没有使用 CompletableFuture 特定方法,因此您可以直接使用ExecutorService

for (int i = 0; i < 100; i++) {
executor.submit(() -> {
try {
someTask();
} catch (InterruptedException e) {
//interrupted, just exit
Thread.currentThread().interrupt();
} catch (Exception e) {
//some other exception: cancel everything
executorService.shutdownNow();
}
});
}

shutdownNow将中断所有已提交的任务,执行者将拒绝新任务提交并RejectedExecutionException

这种方法的主要缺点是不能重用 ExecutorService,尽管您当然可以创建一个新的服务。如果这是一个问题,您将需要另一种方式。

请注意,为了有效地工作,someTask()需要定期检查线程的中断状态并在中断时退出,或使用可中断的方法。例如,如果someTask是一个运行某些计算的循环,并且您从未检查线程中断状态,则该循环将完全运行并且不会停止。

有很多方法可以做到这一点,并且都取决于确切的用例。但所有的方式(我会说(都有一个共同点:你必须以受控的方式结束任务。您不能只是"即时终止"任务的执行,因为这可能会导致数据不一致和其他问题。这通常是通过检查一些标志来完成的,例如Thread.isInterrupted(),如果发出执行应取消的信号,则手动结束执行。您也可以使用自定义标志,我将展示:

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
class Main  {

static final ExecutorService executor = Executors.newCachedThreadPool();


// just dummy stuff to simulate calculation
static void someStuff(AtomicBoolean cancel) {
if (cancel.get()) {
System.out.println("interrupted");
return;
}
System.out.println("doing stuff");
for (int i = 0; i < 100_000_000; i++) {
Math.sqrt(Math.log(i));
}
}


static void someTask(AtomicBoolean cancel) {
someStuff(cancel);
someStuff(cancel);
someStuff(cancel);
}


public static void main(String[] args) {
final Set<CompletableFuture<?>> futures = new HashSet<>();
final AtomicBoolean cancel = new AtomicBoolean();

for (int i = 0; i < 10; i++) { 
futures.add(CompletableFuture.supplyAsync(  
() -> {someTask(cancel); return null;}, executor
));
}
futures.add(CompletableFuture.supplyAsync(() -> {
try {
throw new RuntimeException("dummy exception");
}
catch (RuntimeException re) {
cancel.set(true);
}
return null;
}));

futures.forEach(cf -> cf.join());
executor.shutdownNow();
}
}

请注意,最后一个任务将引发异常,并通过将标志cancel设置为true来处理异常。所有其他任务将通过定期检查来看到这一点。如果标志发出信号,他们将取消执行。如果您注释掉抛出异常的任务,则所有任务都将正常完成。

请注意,此方法与所使用的执行程序无关。

最新更新