问题定义:-
一旦ExecutorService
的所有任务都完成了执行这些作业,我就需要insert some values into Database
。换句话说,只有当所有任务都完成了执行时,我才能insert into database
。作业是我需要插入到数据库中的东西,取决于所有完成这些任务的线程。。
那么我如何检查ExecutorService
的所有任务是否已经执行完毕,然后开始插入数据库
下面是我使用ThreadPoolExecutor
创建任务的代码。
executorService = new ThreadPoolExecutor(
noOfThreads,
noOfThreads,
500L,
TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(noOfThreads),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// Running for particular duration of time
while(System.currentTimeMillis() <= endTime) {
Command newCommand = getNextCommand();
Task nextRunnable = new Task(newCommand, existId, newId);
executorService.submit(nextRunnable); // Submit it for execution
}
/*
Previously I was inserting into database here, but it was wrong as it might be
possible that some threads are still in progress. And If I am inserting right here
into database, then some information I will be loosing for sure. So how can I check
whether all the tasks have finished executing and then I can insert into database.
*/
executorService.shutdown();
if (!executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) {
executorService.shutdownNow();
}
我将用于插入的代码如下-
// Inserting into Database when all the task have finished executing, currently I
for (Entry<Integer, LinkedHashMap<Integer, String>> entry : GUID_ID_MAPPING.entrySet()) {
pstatement = db_connection.prepareStatement(LnPConstants.UPSERT_SQL);
pstatement.setInt(1, entry.getKey());
pstatement.setString(2, entry.getValue().get(LnPConstants.CGUID_ID));
pstatement.setString(3, entry.getValue().get(LnPConstants.PGUID_ID));
pstatement.executeUpdate();
}
所以我需要在所有任务都执行完毕后,把这个for循环放在某个地方。
更新:-
所以像这样的东西-
executorService.shutdown();
if (!executorService.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS)) {
executorService.shutdownNow();
}
// Now Insert into Database when all the task have finished executing
for (Entry<Integer, LinkedHashMap<Integer, String>> entry : GUID_ID_MAPPING.entrySet()) {
pstatement = db_connection.prepareStatement(PDSLnPConstants.UPSERT_SQL);
pstatement.setInt(1, entry.getKey());
pstatement.setString(2, entry.getValue().get(PDSLnPConstants.CGUID_ID));
pstatement.setString(3, entry.getValue().get(PDSLnPConstants.PGUID_ID));
pstatement.executeUpdate();
}
您基本上已经有了答案。等待executorService.awaitTermination
返回,执行器将完成其所有任务。
这会忽略可能由于错误而失败的任务。另一种方法是检查错误,比如
List<Future> futures = ...
for(...) {
futures.add(executor.submit(...));
}
for(Future f : futures) {
//this will throw an exception if an exception
//occurred executing the task, insert error handling as
//appropriate, perhaps calling cancel on tasks that
//have not yet completed
f.get();
}
//at this point all tasks have completed
另一个需要关注的类是ExecutorCompletionService。ExecutorCompletionService允许您按照任务实际完成的顺序获取任务。
使用ExecutiorCompletionService似乎是一个很好的用例,你想过吗?检查ExecutorCompletionService的JavaDoc你可以做一些类似的事情
//wrap your threadpoolexecutor ECS
ExecutorCompletionService<Boolean> ecs=new ExecutorCompletionService<Boolean>(executorService)
// submit task to ECS
ecs.submit(nextRunnable);
int noOfTasks=0;
// Running for particular duration of time
while(System.currentTimeMillis() <= endTime) {
Command newCommand = getNextCommand();
Task nextRunnable = new Task(newCommand, existId, newId);
ecs.submit(nextRunnable, Boolean.TRUE); // Submit it for execution
noOfTasks++;
}
//Now we can use ECS to tell us what tasks are finished
for (int i=0;i<noOfTasks;i++){
try {
//This is blocking call it would block until anyone of the task submitted is completed.
Boolean result=ecs.take.get();
// do something if you would like to handle result, you can also design to pass some unique id instead of Boolean so that you can keep track of what tasks are completed
}catch(ExecutionException e){//do error handling for task failure here}
}
//once you are here you are sure that all tasks are completed, call your database code here, you can wrap whole segment into try catch finally and close ExecutorCompletionservie