我正在寻找一种在java中执行批量任务的方法。这个想法是拥有一个基于线程池的ExecutorService
,这将允许我将一组Callable
从main
线程分散到不同的线程之间。此类应提供一个 waitForCompletion 方法,该方法将使main
线程进入睡眠状态,直到执行所有任务。然后应该唤醒main
线程,它将执行一些操作并重新提交一组任务。
此过程将重复多次,因此我想使用ExecutorService.shutdown
因为这需要创建多个ExecutorService
实例。
目前,我已使用以下AtomicInteger
和Lock
/Condition
通过以下方式实现它:
public class BatchThreadPoolExecutor extends ThreadPoolExecutor {
private final AtomicInteger mActiveCount;
private final Lock mLock;
private final Condition mCondition;
public <C extends Callable<V>, V> Map<C, Future<V>> submitBatch(Collection<C> batch){
...
for(C task : batch){
submit(task);
mActiveCount.incrementAndGet();
}
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
mLock.lock();
if (mActiveCount.decrementAndGet() == 0) {
mCondition.signalAll();
}
mLock.unlock();
}
public void awaitBatchCompletion() throws InterruptedException {
...
// Lock and wait until there is no active task
mLock.lock();
while (mActiveCount.get() > 0) {
try {
mCondition.await();
} catch (InterruptedException e) {
mLock.unlock();
throw e;
}
}
mLock.unlock();
}
}
请不要说我不一定会一次提交批次中的所有任务,因此CountDownLatch
似乎不是一种选择。
这是一种有效的方法吗?有没有更有效/更优雅的方式来实现它?
谢谢
我认为执行器服务本身将能够执行您的要求。
调用invokeAll([...])
并迭代所有任务。所有任务都已完成,如果你可以遍历所有期货。
正如其他答案所指出的,您的用例中似乎没有任何部分需要自定义的ExecutorService。
在我看来,您需要做的就是提交一个批次,等待它们全部完成,同时忽略主线程上的中断,然后根据第一批的结果提交另一批。 我相信这只是一个问题:
ExecutorService service = ...;
Collection<Future> futures = new HashSet<Future>();
for (Callable callable : tasks) {
Future future = service.submit(callable);
futures.add(future);
}
for(Future future : futures) {
try {
future.get();
} catch (InterruptedException e) {
// Figure out if the interruption means we should stop.
}
}
// Use the results of futures to figure out a new batch of tasks.
// Repeat the process with the same ExecutorService.
我同意@ckuetbach,默认的Java Executors
应该为您提供执行"批量"作业所需的所有功能。
如果我是你,我只会提交一堆工作,等待它们完成ExecutorService.awaitTermination()
,然后启动一个新的ExecutorService
。 这样做是为了节省"线程创建"是为时过早的优化,除非您每秒执行此操作 100 次或其他什么。
如果您真的坚持对每个批次使用相同的ExecutorService
,那么您可以自己分配一个ThreadPoolExecutor
,并在循环中查看ThreadPoolExecutor.getActiveCount()
。 像这样:
BlockingQueue jobQueue = new LinkedBlockingQueue<Runnable>();
ThreadPoolExecutor executor = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS,
0L, TimeUnit.MILLISECONDS, jobQueue);
// submit your batch of jobs ...
// need to wait a bit for the jobs to start
Thread.sleep(100);
while (executor.getActiveCount() > 0 && jobQueue.size() > 0) {
// to slow the spin
Thread.sleep(1000);
}
// continue on to submit the next batch