我是Java并发的新手,我遇到了一个非常奇怪的问题:我从一个大文件中读取数据,并使用几个工作线程处理输入(一些复杂的字符串匹配任务)。我使用LinkedBlockingQueue将数据传输到工作线程,并使用worker类中的易失性布尔标志在到达文件结束时响应信号。
然而,我无法使工作线程正确停止。这个程序的CPU使用率最后几乎为零,但是程序不会正常终止。
简化后的代码如下。我已经删除了实际的代码,并将其替换为一个简单的单词计数器。但效果是一样的。在处理完整个文件后,工作线程不会终止,并且在主线程中将布尔标志设置为true。
具有main
的类
public class MultiThreadTestEntry
{
private static String inputFileLocation = "someFile";
private static int numbOfThread = 3;
public static void main(String[] args)
{
int i = 0;
Worker[] workers = new Worker[numbOfThread];
Scanner input = GetIO.getTextInput(inputFileLocation);
String temp = null;
ExecutorService es = Executors.newFixedThreadPool(numbOfThread);
LinkedBlockingQueue<String> dataQueue = new LinkedBlockingQueue<String>(1024);
for(i = 0 ; i < numbOfThread ; i ++)
{
workers[i] = new Worker(dataQueue);
workers[i].setIsDone(false);
es.execute(workers[i]);
}
try
{
while(input.hasNext())
{
temp = input.nextLine().trim();
dataQueue.put(temp);
}
}
catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
input.close();
for(i = 0 ; i < numbOfThread ; i ++)
{
workers[i].setIsDone(true);
}
es.shutdown();
try
{
es.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
} catch (InterruptedException e)
{
Thread.currentThread().interrupt();
}
}
}
worker
类
public class Worker implements Runnable
{
private LinkedBlockingQueue<String> dataQueue = null;
private volatile boolean isDone = false;
public Worker(LinkedBlockingQueue<String> dataQueue)
{
this.dataQueue = dataQueue;
}
@Override
public void run()
{
String temp = null;
long count = 0;
System.out.println(Thread.currentThread().getName() + " running...");
try
{
while(!isDone || !dataQueue.isEmpty())
{
temp = dataQueue.take();
count = temp.length() + count;
if(count%1000 == 0)
{
System.out.println(Thread.currentThread().getName() + " : " + count);
}
}
System.out.println("Final result: " + Thread.currentThread().getName() + " : " + count);
}
catch (InterruptedException e)
{
}
}
public void setIsDone(boolean isDone)
{
this.isDone = isDone;
}
}
有什么建议吗?
正如Dan Getz已经说过的,你的工作take()
等待直到一个元素可用,但队列可能是空的。
在你的代码中,你检查Queue是否为空,但是没有什么可以阻止其他worker在检查后从元素中读取和删除一个元素。
如果队列中只有一个元素,并且t1
和t2
是两个线程可能发生以下情况:
在这种情况下,t1
将永远等待。
您可以通过使用poll
而不是take
来避免这种情况,并检查结果是否为null
public void run()
{
String temp = null;
long count = 0;
System.out.println(Thread.currentThread().getName() + " running...");
try
{
while(!isDone || !dataQueue.isEmpty())
{
temp = dataQueue.poll(2, TimeUnit.SECONDS);
if (temp == null)
// re-check if this was really the last element
continue;
count = temp.length() + count;
if(count%1000 == 0)
{
System.out.println(Thread.currentThread().getName() + " : " + count);
}
}
System.out.println("Final result: " + Thread.currentThread().getName() + " : " + count);
}
catch (InterruptedException e)
{
// here it is important to restore the interrupted flag!
Thread.currentThread().interrupt();
}
}