Understanding Java FixedThreadPool



我试图了解Java FixedThreadPool在实践中是如何工作的,但文档没有回答我的问题。

假设一个简单的场景,如:

ExecutorService ES= Executors.newFixedThreadPool(3);
List<Future> FL;
for(int i=1;i<=200;i++){
   FL.add(ES.submit(new Task()));
}
ES.shutdown();

其中Task是构造一些资源、使用它们并返回一些输出的Callable

我的问题:完成for循环后,内存中有多少Task?换句话说:一次是否只有3个Task在构建他们的资源,或者所有的资源都是预先创建的,这样,在.submit之后,我有200个Task(及其资源)等待执行?

注意:资源构造发生在Task的构造函数中,而不是call()方法中。

在javadoc中(可以跳过以下内容):让我困惑的是Java文档中的以下解释

创建一个线程池,该线程池重用固定数量的操作线程离开共享的无限制队列。在任何时候,最多nThreads线程将是活动的处理任务。

我想这意味着,在我的示例中,所有200个Task都在队列中,但在任何时候都只执行其中的3个。

非常感谢您的帮助。

您的代码相当于

for (int i = 1; i <= 200; i++){
    Task t = new Task();
    FL.add(ES.submit(t));
}

在for循环之后,Task的构造函数被调用了200次,它所包含的代码也被执行了200次。任务是否提交给执行器是无关紧要的:您在一个循环中调用构造函数200次,在构建完每个任务后,它就会提交给执行者。执行器不是调用任务构造函数的执行器。

任务将从队列中逐个删除,因此随着执行的进行,任务将被删除,并且只有它们的结果将存储在那些Future对象中。

所以基本上在记忆中:

3个线程
200->0任务
0->200未来
(每个执行的任务)

您正在使用new Task()创建200个对象,这些任务将提交给executor。执行者持有对这个Task对象的引用。因此,如果在Task的构造函数中,您正在构造并持有资源,那么所有200任务都将持有资源。

如果可能的话,如果不希望200个实例来构造和保存资源,可以在Task的调用方法中构造和使用资源。在这种情况下,一次只有3个Task将构造并保持资源。

所有200个任务都被创建并消耗资源,并且所有任务都在队列中。

线程池仅在有空闲线程可供执行时逐个调用其run()/call()方法。

要理解这一点,您需要了解在循环中将任务提交给Executor时发生了什么。首先,我们将只关注向执行器提交单个任务。我现在将参考JDK 1.7.0_51源代码

静态方法CCD_ 16方法返回包含阻塞队列的CCD_

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

当你向这个Executor添加一个任务时,就会转到ThreadPoolExecutor的submit方法,扩展AbstractExecutorService,在那里编写submit方法的实现。

public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }

执行方法是特定于实现的(意味着不同类型的执行器以不同的方式实现它)

现在真正的肉来了。这是ThreadPoolExecutor中定义的执行方法尤其要注意评论在这里,很少有像corePoolSize这样的ThreadPoolExecutor的配置参数发挥作用。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

相关内容

  • 没有找到相关文章

最新更新