我写了一个在生产者-消费者模型中使用SynchronousQueue的测试示例。但它效果不佳。以下是我的代码:
public class QueueTest {
String input;
int pos;
BlockingQueue<String> queue;
volatile boolean exitFlag;
QueueTest()
{
for(int i=0; i<10000; i++)
input += "abcde";
input += "X";
pos = 0;
queue = new SynchronousQueue<String>();
exitFlag = false;
}
public static void main(String[] args) {
QueueTest qtest = new QueueTest();
qtest.runTest();
}
void runTest()
{
Thread producer = new Thread( new Producer());
Thread consumer = new Thread( new Consumer());
producer.start();
consumer.start();
try {
producer.join();
consumer.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
class Producer implements Runnable
{
public void run()
{
while(true)
{
String s = read();
if(s.equals("X"))
break;
try {
queue.put(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
exitFlag = true;
}
}
class Consumer implements Runnable
{
public void run()
{
while(exitFlag == false)
{
String s = null;
try {
s = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
process(s);
}
}
}
String read()
{
String str = input.substring(pos, pos+1);
pos++;
return str;
}
void process(String s)
{
long sum = 0;
for(long i=0; i<1000; i++)
sum = sum * i + i;
}
}
问题是运行像死锁一样卡住了。这些简单的代码中是否有任何错误?
您更有可能看到竞争条件。 想象场景
Thread 1 put into queue
Thread 2 takes out of queue quickly processes and awaits another put from thread 1
Thread 1 finishes and sets exitFlag to true
在这种情况下,线程 2 将永久存在,因为在线程 2 从中读取之前,exitFlag 未设置为 false。
您可能需要考虑毒丸。 这是一条发送给我们已经完成的其他线程的消息。 例如:
final String POISON_PILL = " POISON";
class Producer implements Runnable {
public void run() {
while (true) {
String s = read();
if (s.equals("X"))
break;
try {
queue.put(s);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
try {
queue.put(POISON_PILL);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
class Consumer implements Runnable {
public void run() {
String s = null;
try {
while ((s = queue.take()) != POISON_PILL) {
process(s);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
因此,当另一个线程收到通知时,另一个线程已完成两个线程都应正常结束。
由于您的 exitFlag 在多个线程之间共享,因此您必须执行一些操作以使生产者的更新对消费者可见(就 java 内存模型而言)。 对于此示例,使值可变就足够了。
更新:
您应该生成挂起代码的堆栈转储。 这将为您提供有关正在发生的事情的线索。 这段代码是一个很好的例子,说明为什么你不应该将标志与 BlockingQueue 一起使用。
更新 2:
无赖,@JohnVint把猫从袋子里放出来。 是的,毒丸是解决这种竞争条件的方法。
您的程序将卡在以下情况下:
生产者在消费者检查 existFlag 是否为 true 之后设置 exitFlag(不放置新元素)。如果队列中没有更多的元素(消费者之前设法处理了所有元素),消费者将在 queue.take() 上被阻止。
您可以使用 queue.poll(),它不是阻塞方法。这需要稍微改变你的程序。