我有一个生产者-消费者模式。 1 个生产者(做一些工作)和 7 个消费者(在 8 核机器上)。生产者应该一次下载 7 个文件(2000 个文件),并等待 7 个线程处理它们,然后再继续下一个 7 个。设置大致如下:
ExecutorService threadPool = Executors.newFixedThreadPool(8);
int pollWait = 4;
int queSize = 7;
Broker broker = new Broker(queSize,pollWait);
代理实现了一个容量为 7 的 LinkedBlockingQueue。
Producer producer = new Producer();
producer.setBroker(broker);
Future<Integer> producerStatus = threadPool.submit(producer);
int numConsumerThreads = 7;
ArrayList<Future<Integer>> consumers = new ArrayList<Future<Integer>>();
for(int c=0; c < numConsumerThreads;c++)
{
String threadName = "consumer-"+(c+1);
Consumer consumer = new Consumer(threadName);
consumer.setBroker(broker);
Future<Integer> consumerStatus = threadPool.submit(consumer);
consumers.add(consumerStatus);
}
if(producerStatus.isDone())
{
Integer numFilesRead = producerStatus.get();
System.out.println("[INFO] Total number of files downloaded by producer: " + numFilesRead);
}
int k=0,numIndexedByThread=0;
while(!consumers.isEmpty())
{
final Iterator<Future<Integer>> i = consumers.iterator();
while (i.hasNext())
{
Future<Integer> f = i.next();
if (f.isDone())
{
i.remove();
numIndexedByThread = f.get();
k += numIndexedByThread;
}
}
}
System.out.println("[INFO] Total number of files indexed: " + k);
threadPool.shutdown();
我看到的问题是程序无法执行。
您已序列化结果的处理
以下行阻止并序列化结果的整个处理:
Integer numConsumedByThread = consumerStatus.get();
正确的习惯用法是检查每个isDone()
,并且仅在isDone()
返回 true 时才调用 .get()
,否则.get()
会阻塞循环,直到它有要返回的结果。
伪编码
while(!listOfFutures.isEmpty())
{
final Iterator<Future> i = listofFutures.iterator;
while (i.hasNext())
{
Future f = i.next();
if (f.isDone())
{
i.remove();
// call .get() and process the completed result
}
}
}
阅读 JavaDocs 以获得Future.get()