如何实现执行器服务以执行批量任务



我正在寻找一种在java中执行批量任务的方法。这个想法是拥有一个基于线程池的ExecutorService,这将允许我将一组Callablemain线程分散到不同的线程之间。此类应提供一个 waitForCompletion 方法,该方法将使main线程进入睡眠状态,直到执行所有任务。然后应该唤醒main线程,它将执行一些操作并重新提交一组任务。

此过程将重复多次,因此我想使用ExecutorService.shutdown因为这需要创建多个ExecutorService实例。

目前,我已使用以下AtomicIntegerLock/Condition通过以下方式实现它:

public class BatchThreadPoolExecutor extends ThreadPoolExecutor {
  private final AtomicInteger mActiveCount;
  private final Lock          mLock;
  private final Condition     mCondition;
  public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){
    ...
    for(C task : batch){
      submit(task);
      mActiveCount.incrementAndGet();
    }
  }
  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    super.afterExecute(r, t);
    mLock.lock();
    if (mActiveCount.decrementAndGet() == 0) {
      mCondition.signalAll();
    }
    mLock.unlock();
  }
  public void awaitBatchCompletion() throws InterruptedException {
    ...
    // Lock and wait until there is no active task
    mLock.lock();
    while (mActiveCount.get() > 0) {
      try {
        mCondition.await();
      } catch (InterruptedException e) {
        mLock.unlock();
        throw e;
      }
    }
    mLock.unlock();
  } 
}

请不要说我不一定会一次提交批次中的所有任务,因此CountDownLatch似乎不是一种选择。

这是一种有效的方法吗?有没有更有效/更优雅的方式来实现它?

谢谢

我认为执行器服务本身将能够执行您的要求。

调用invokeAll([...])并迭代所有任务。所有任务都已完成,如果你可以遍历所有期货。

正如其他答案所指出的,您的用例中似乎没有任何部分需要自定义的ExecutorService。

在我看来,您需要做的就是提交一个批次,等待它们全部完成,同时忽略主线程上的中断,然后根据第一批的结果提交另一批。 我相信这只是一个问题:

    ExecutorService service = ...;
    Collection<Future> futures = new HashSet<Future>();
    for (Callable callable : tasks) {
        Future future = service.submit(callable);
        futures.add(future);
    }
    for(Future future : futures) {
        try {
            future.get();
        } catch (InterruptedException e) {
            // Figure out if the interruption means we should stop.
        }
    }
    // Use the results of futures to figure out a new batch of tasks.
    // Repeat the process with the same ExecutorService.

我同意@ckuetbach,默认的Java Executors应该为您提供执行"批量"作业所需的所有功能。

如果我是你,我只会提交一堆工作,等待它们完成ExecutorService.awaitTermination(),然后启动一个新的ExecutorService。 这样做是为了节省"线程创建"是为时过早的优化,除非您每秒执行此操作 100 次或其他什么。

如果您真的坚持对每个批次使用相同的ExecutorService,那么您可以自己分配一个ThreadPoolExecutor,并在循环中查看ThreadPoolExecutor.getActiveCount()。 像这样:

BlockingQueue jobQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS,
    0L, TimeUnit.MILLISECONDS, jobQueue);
// submit your batch of jobs ...
// need to wait a bit for the jobs to start
Thread.sleep(100);
while (executor.getActiveCount() > 0 && jobQueue.size() > 0) {
    // to slow the spin
    Thread.sleep(1000);
}
// continue on to submit the next batch

最新更新