Java线程没有响应易失性布尔标志



我是Java并发的新手,我遇到了一个非常奇怪的问题:我从一个大文件中读取数据,并使用几个工作线程处理输入(一些复杂的字符串匹配任务)。我使用LinkedBlockingQueue将数据传输到工作线程,并使用worker类中的易失性布尔标志在到达文件结束时响应信号。

然而,我无法使工作线程正确停止。这个程序的CPU使用率最后几乎为零,但是程序不会正常终止。

简化后的代码如下。我已经删除了实际的代码,并将其替换为一个简单的单词计数器。但效果是一样的。在处理完整个文件后,工作线程不会终止,并且在主线程中将布尔标志设置为true。

具有main的类

public class MultiThreadTestEntry
{
    private static String inputFileLocation = "someFile";
    private static int numbOfThread = 3;
    public static void main(String[] args)
    {
        int i = 0;
        Worker[] workers = new Worker[numbOfThread];
        Scanner input = GetIO.getTextInput(inputFileLocation);
        String temp = null;
        ExecutorService es = Executors.newFixedThreadPool(numbOfThread);
        LinkedBlockingQueue<String> dataQueue = new LinkedBlockingQueue<String>(1024);
        for(i = 0 ; i < numbOfThread ; i ++)
        {
            workers[i] = new Worker(dataQueue);
            workers[i].setIsDone(false);
            es.execute(workers[i]);
        }
        try
        {
            while(input.hasNext())
            {
                temp = input.nextLine().trim();
                dataQueue.put(temp);
            }
        }
        catch (InterruptedException e)
        {
            Thread.currentThread().interrupt();
        }
        input.close();
        for(i = 0 ; i < numbOfThread ; i ++)
        {
            workers[i].setIsDone(true);
        }
        es.shutdown();
        try 
        {
            es.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        } catch (InterruptedException e)
        {
            Thread.currentThread().interrupt();
        }
    }
}

worker

public class Worker implements Runnable
{
    private LinkedBlockingQueue<String> dataQueue = null;
    private volatile boolean isDone = false;
    public Worker(LinkedBlockingQueue<String> dataQueue)
    {
        this.dataQueue = dataQueue;
    }
    @Override
    public void run()
    {
        String temp = null;
        long count = 0;
        System.out.println(Thread.currentThread().getName() + " running...");
        try
        {
            while(!isDone || !dataQueue.isEmpty())
            {
                temp = dataQueue.take();
                count = temp.length() + count;
                if(count%1000 == 0)
                {
                    System.out.println(Thread.currentThread().getName() + " : " + count);
                }
            }
            System.out.println("Final result: " + Thread.currentThread().getName() + " : " + count);
        }
        catch (InterruptedException e)
        {
        }
    }
    public void setIsDone(boolean isDone)
    {
        this.isDone = isDone;
    }
}

有什么建议吗?

正如Dan Getz已经说过的,你的工作take()等待直到一个元素可用,但队列可能是空的。

在你的代码中,你检查Queue是否为空,但是没有什么可以阻止其他worker在检查后从元素中读取和删除一个元素。

如果队列中只有一个元素,并且t1t2是两个线程可能发生以下情况:

<>之前t2.isEmpty ();//-> falset1.isEmpty ();//-> falset2.take ();//现在队列为空t1.take ();//永远等待之前

在这种情况下,t1将永远等待。

您可以通过使用poll而不是take来避免这种情况,并检查结果是否为null

public void run()
{
    String temp = null;
    long count = 0;
    System.out.println(Thread.currentThread().getName() + " running...");
    try
    {
        while(!isDone || !dataQueue.isEmpty())
        {
            temp = dataQueue.poll(2, TimeUnit.SECONDS);
            if (temp == null) 
                // re-check if this was really the last element
                continue;
            count = temp.length() + count;
            if(count%1000 == 0)
            {
                System.out.println(Thread.currentThread().getName() + " : " + count);
            }
        }
        System.out.println("Final result: " + Thread.currentThread().getName() + " : " + count);
    }
    catch (InterruptedException e)
    {
        // here it is important to restore the interrupted flag!
        Thread.currentThread().interrupt();
    }
}

最新更新