执行器服务中的线程未完成



我有一个生产者-消费者模式。 1 个生产者(做一些工作)和 7 个消费者(在 8 核机器上)。生产者应该一次下载 7 个文件(2000 个文件),并等待 7 个线程处理它们,然后再继续下一个 7 个。设置大致如下:

            ExecutorService threadPool = Executors.newFixedThreadPool(8);
        int pollWait = 4; 
        int queSize = 7;
        Broker broker = new Broker(queSize,pollWait);

代理实现了一个容量为 7 的 LinkedBlockingQueue。

        Producer producer = new Producer(); 
        producer.setBroker(broker);
        Future<Integer> producerStatus = threadPool.submit(producer);
        int numConsumerThreads = 7;
        ArrayList<Future<Integer>> consumers = new ArrayList<Future<Integer>>();
        for(int c=0; c < numConsumerThreads;c++)
        {
            String threadName = "consumer-"+(c+1);
            Consumer consumer = new Consumer(threadName);
            consumer.setBroker(broker);
            Future<Integer> consumerStatus = threadPool.submit(consumer);
            consumers.add(consumerStatus);                            
        }   
        if(producerStatus.isDone())
        {
            Integer numFilesRead = producerStatus.get();
            System.out.println("[INFO] Total number of files downloaded by producer: " + numFilesRead);
        }
        int k=0,numIndexedByThread=0;
        while(!consumers.isEmpty())
        {
            final Iterator<Future<Integer>> i = consumers.iterator();
            while (i.hasNext())
            {                   
               Future<Integer> f = i.next();
               if (f.isDone())
               {
                  i.remove();
                  numIndexedByThread = f.get();
                  k += numIndexedByThread;
               } 
             }
        }
        System.out.println("[INFO] Total number of files indexed: " + k);
        threadPool.shutdown();

我看到的问题是程序无法执行。

您已序列化结果的处理

以下行阻止并序列化结果的整个处理:

Integer numConsumedByThread = consumerStatus.get();

正确的习惯用法是检查每个isDone(),并且仅在isDone()返回 true 时才调用 .get(),否则.get()会阻塞循环,直到它有要返回的结果。

伪编码

while(!listOfFutures.isEmpty())
{
    final Iterator<Future> i = listofFutures.iterator;
    while (i.hasNext())
    {
       Future f = i.next();
       if (f.isDone())
       {
          i.remove();
          // call .get() and process the completed result 
       } 
     }
}

阅读 JavaDocs 以获得Future.get()

最新更新