可调用程序的并行执行



我想并行执行多个可调用程序。但似乎ExecutorService总是等到所有可调用文件都完成。

我试过以下几种:

final int nThreads = 10;
ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
List<PrimeCallable> tasks = new ArrayList<PrimeCallable>();
for(int i = 0; i < nThreads; i++) {
    tasks.add(new PrimeCallable(0, i * 100 + 100, "thread" + i));
}
try {
    for(Future<List<Integer>> result : executorService.invokeAll(tasks)) {
        List<Integer> integers = result.get();
        for(Integer i : integers){
            System.out.println(i);
        }
    }
} catch (InterruptedException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
} catch (ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

现在,当executedService中的所有可调用对象都完成finishing时,就会调用for循环。据我所知,没有executorServices.isParallel-setter;-)。

让可调用程序并行运行的正确方法是什么?

谢谢你的提示!

invokeAll的javadocs说;

执行给定的任务,返回持有其状态的期货列表并且当全部完成时结果。Future.isDone()对于返回列表的每个元素都为true。

因此invokeAll会阻塞,直到集合中的每个任务都完成为止。

Executor服务并行运行所有可调用程序。它所做的只是等待所有并行任务完成后再继续。因此,它不像所有任务都是串行运行的。

听起来你想要的是lazy执行——你不想在提取结果之前在内存中复制结构。

我认为这是一个迭代+转换的问题。首先,在您的输入上定义一个迭代器,这样每次对next()的调用都会返回一个Callable,它将在您的系列中产生下一个值。

转换阶段是对这些Callables应用并行或并发评估,类似于以下内容(未测试):

public class ConcurrentTransform
{
  private final ExecutorService executor;
  private final int maxBuffer;
  public ConcurrentTransform(ExecutorService executor, int maxWorkBuffer) {
    this.executor = executor;
    this.maxBuffer = Math.max(1, maxWorkBuffer);
  }
  public <T> Iterator<T> apply(final Iterator<Callable<T>> input) {
    // track submitted work
    final BlockingQueue<Future<T>> submitted = new LinkedBlockingQueue<Future<T>>();
    // submit first N tasks
    for (int i=0; i<maxBuffer && input.hasNext(); i++) {
      Callable<T> task = input.next();
      Future<T> future = executor.submit(task);
      submitted.add(future);
    }
    return new Iterator<T>(){
      @Override
      public synchronized boolean hasNext() {
        return !submitted.isEmpty();
      }
      @Override
      public T next() {
        Future<T> result;
        synchronized (this) {
          result = submitted.poll();
          if (input.hasNext()) {
            submitted.add(executor.submit(input.next()));
          }
        }
        if (result != null) {
          try {
            return result.get(); // blocking
          } catch (Exception e) {
            if (e instanceof RuntimeException) {
               throw (RuntimeException) e;
            } else {
               throw new RuntimeException(e);
            }
          }
        } else {
          throw new NoSuchElementException();
        }
      }
      @Override
      public void remove() {
        throw new UnsupportedOperationException();
      }};
  }
}

在调用apply(…)之后,您将迭代得到的值,这些值将并行执行Callable对象,并以与输入相同的顺序返回结果。一些改进是允许阻塞result.get()调用有一个可选的超时,或者在转换本身中管理线程池。

如果您想在结果发生时查看结果,请使用ExecutorCompletionService

相关内容

  • 没有找到相关文章

最新更新