我试图了解Java FixedThreadPool在实践中是如何工作的,但文档没有回答我的问题。
假设一个简单的场景,如:
ExecutorService ES= Executors.newFixedThreadPool(3);
List<Future> FL;
for(int i=1;i<=200;i++){
FL.add(ES.submit(new Task()));
}
ES.shutdown();
其中Task
是构造一些资源、使用它们并返回一些输出的Callable
。
我的问题:完成for
循环后,内存中有多少Task
?换句话说:一次是否只有3个Task
在构建他们的资源,或者所有的资源都是预先创建的,这样,在.submit
之后,我有200个Task
(及其资源)等待执行?
注意:资源构造发生在Task
的构造函数中,而不是call()
方法中。
在javadoc中(可以跳过以下内容):让我困惑的是Java文档中的以下解释
创建一个线程池,该线程池重用固定数量的操作线程离开共享的无限制队列。在任何时候,最多nThreads线程将是活动的处理任务。
我想这意味着,在我的示例中,所有200个Task都在队列中,但在任何时候都只执行其中的3个。
非常感谢您的帮助。
您的代码相当于
for (int i = 1; i <= 200; i++){
Task t = new Task();
FL.add(ES.submit(t));
}
在for循环之后,Task的构造函数被调用了200次,它所包含的代码也被执行了200次。任务是否提交给执行器是无关紧要的:您在一个循环中调用构造函数200次,在构建完每个任务后,它就会提交给执行者。执行器不是调用任务构造函数的执行器。
任务将从队列中逐个删除,因此随着执行的进行,任务将被删除,并且只有它们的结果将存储在那些Future对象中。
所以基本上在记忆中:
3个线程
200->0任务
0->200未来
(每个执行的任务)
您正在使用new Task()
创建200个对象,这些任务将提交给executor。执行者持有对这个Task
对象的引用。因此,如果在Task
的构造函数中,您正在构造并持有资源,那么所有200任务都将持有资源。
如果可能的话,如果不希望200个实例来构造和保存资源,可以在Task
的调用方法中构造和使用资源。在这种情况下,一次只有3个Task
将构造并保持资源。
所有200个任务都被创建并消耗资源,并且所有任务都在队列中。
线程池仅在有空闲线程可供执行时逐个调用其run()/call()方法。
要理解这一点,您需要了解在循环中将任务提交给Executor时发生了什么。首先,我们将只关注向执行器提交单个任务。我现在将参考JDK 1.7.0_51
源代码
静态方法CCD_ 16方法返回包含阻塞队列的CCD_
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
当你向这个Executor添加一个任务时,就会转到ThreadPoolExecutor
的submit方法,扩展AbstractExecutorService
,在那里编写submit方法的实现。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
执行方法是特定于实现的(意味着不同类型的执行器以不同的方式实现它)
现在真正的肉来了。这是ThreadPoolExecutor
中定义的执行方法尤其要注意评论在这里,很少有像corePoolSize
这样的ThreadPoolExecutor的配置参数发挥作用。
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}