我正在使用executor服务并行运行任务。并行运行方法接受输入整数并返回整数。由于并行任务具有返回类型,所以我使用了Callable匿名类。您可以在下面的示例中看到ExecutorServiceExample task(int i )
是从executor调用的。任务方法还有1秒的等待时间,并为i==7;
抛出异常
在下面的实现中,我使用invokeAll和isDone,并尝试收集数据。
下面的程序抛出IllegalMonitorStateException
。
Future任务迭代和检查isDone和get((有什么问题。如何处理特定调用的异常。我想并行运行所有1到14个任务,并在所有完成时收集返回类型。此外,在出现错误的情况下,应该知道哪些输入出现异常,如(7和14(
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
class MyException extends Exception{
MyException(String message) {
super(message);
}
}
public class ExecutorServiceExample {
public int task(int i) throws MyException, InterruptedException {
System.out.println("Running task.."+i);
wait(1000);
if(i%7==0) {
throw new MyException("multiple of 7 not allowed");
}
return i;
}
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Callable<Integer>> tasks = Arrays.asList(1,2,3,4,5,6,7,8,9,10,11,12,13,14).stream().map(id->{
return new Callable<Integer>() {
@Override
public Integer call() throws Exception {
ExecutorServiceExample executorServiceExample = new ExecutorServiceExample();
return executorServiceExample.task(id);
}
};
}).collect(Collectors.toList());
try{
List<Future<Integer>> results = executorService.invokeAll(tasks);
for (Future<Integer> task: results) {
if(task.isDone()){
System.out.println(task.get());
}
}
}catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}finally {
executorService.shutdown();
}
}
}
事实上,每个任务都会生成一个IllegalMonitorStateException
,因为您没有在synchronized
块中调用wait
方法:等待((调用时出现IllegalMonitorStateException。也许您应该使用sleep
而不是wait
。
CCD_ 9由CCD_。因此,如果缩小try-catch
的范围,实际上会捕获14个异常:
for (Future<Integer> task: results) {
try {
System.out.println(task.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
我不知道你为什么这样设计它。但很明显,它有很多问题。i/7==0 or i % 7 ==0
?examples
不是锁,为什么要使用wait?"invokeAll"返回的期货必须完成,但可能在invoke-get时获得异常。这是你想要的吗?
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.Stream;
class MyException extends Exception {
MyException(String message) {
super(message);
}
}
public class ExecutorServiceExample {
public int task(int i) throws MyException, InterruptedException {
TimeUnit.MILLISECONDS.sleep(1000);
if (i % 7 == 0) {
throw new MyException("multiple of 7 not allowed");
}
return i;
}
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Callable<Integer>> tasks = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)
.map(id -> (Callable<Integer>) () -> {
ExecutorServiceExample executorServiceExample = new ExecutorServiceExample();
return executorServiceExample.task(id);
}).collect(Collectors.toList());
List<Future<Integer>> results = executorService.invokeAll(tasks);
executorService.shutdown();
for (Future<Integer> task : results) {
try {
System.out.println(task.get());
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
}