当我第一次读到接口 BlockingQueue 时,我读到:如果队列中没有更多空间,生产者会阻止队列中的任何更多 put()调用。相反,如果没有要获取的项目,它会阻止方法take()。我认为它在内部的工作方式与wait()和notify()相同。例如,当内部没有更多元素要读取时,将调用 wait(),直到 Producer 再添加一个元素并调用 notify()。或者这就是我们在"旧的生产者/消费者模式"中所做的。但它在阻塞队列中不是这样工作的。如何?有什么意义?老实说,我很惊讶!
我将演示:
public class Testing {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);
synchronized void write() throws InterruptedException {
for (int i = 0; i < 6; i++) {
blockingQueue.put(i);
System.out.println("Added " + i);
Thread.sleep(1000);
}
}
synchronized void read() throws InterruptedException {
for (int i = 0; i < 6; i++) {
System.out.println("Took: " + blockingQueue.take());
Thread.sleep(3000);
}
}
}
class Test1 {
public static void main(String[] args) {
Testing testing = new Testing();
new Thread(new Runnable() {
@Override
public void run() {
try {
testing.write();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
testing.read();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
输出:
Added 0
Added 1
Added 2
"程序挂起"。
我的问题是,如果 take() 和 put() BLOCK 内部不使用 wait() 或 notify(),它们如何阻止?他们是否有一些快速燃烧 CPU 圈的 while 循环?坦率地说,我很困惑。
这是ArrayBlockingQueue#put
的当前实现:
/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
您将看到,它不是使用wait()
和notify()
,而是调用notFull.await();
notFull
是Condition
。
Condition
的文档如下:
条件将对象监视器方法(等待、通知和通知全部)分解为不同的对象,通过将它们与任意 Lock 实现相结合,为每个对象提供多个等待集的效果。Lock 取代了同步方法和语句的使用,条件取代了对象监视器方法的使用。
如果你通过下面的代码,你会知道生产者/消费者的问题将如何使用BlokingQueue接口得到解决。
在这里,您可以看到生产者和消费者共享了相同的队列。
从主类开始,您将同时启动线程生产者和消费者。
class Producer implements Runnable {
protected BlockingQueue blockingQueue = null;
public Producer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 6; i++) {
try {
blockingQueue.put(i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Added " + i);
}
}
}
class Consumer implements Runnable {
protected BlockingQueue blockingQueue = null;
public Consumer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 6; i++) {
try {
System.out.println("Took: " + blockingQueue.take());
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Test1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue(3);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}
此代码将打印输出,例如
Took: 0
Added 0
Added 1
Added 2
Took: 1
Added 3
Added 4
Took: 2
Added 5
Took: 3
Took: 4
Took: 5
(我确信我的答案的某些或全部部分可能是您已经理解的内容,在这种情况下,请将其视为澄清:))。
1. 为什么使用阻塞队列的代码示例会出现"程序挂起"?
1.1 从概念上讲
首先,如果我们能省略一下实现级别的细节,如'wait()'、'notify()'等,从概念上讲,BlockingQueue 的 JAVA 中的所有实现都按照规范工作,即就像你说的:
'生产者阻止队列中任何更多的 put() 调用,如果它没有更多 空间。相反,如果没有,它会阻止方法 take() 要带走的物品。
因此,从概念上讲,代码示例挂起的原因是因为
1.1.1.
调用(同步)write() 的线程首先单独运行,直到 'testing.write()' 在此线程中返回,调用(同步)read() 的第二个线程将有机会运行 — 这是同一对象中"同步"方法的本质。
1.1.2.
现在,在您的示例中,从概念上讲,"testing.write()"永远不会返回,在该 for 循环中,它会将前 3 个元素"放入"队列,然后有点"旋转等待"第二个线程消耗/"获取"其中一些元素,以便它可以"放置"更多,但由于 1.1.1 中的上述原因,这永远不会发生
1.2 以编程方式
1.2.1.
(对于生产者) 在 ArrayBlockingQueue#put 中,我在 1.1.2 中提到的"旋转等待"的形式是
while (count == items.length) notFull.await();
1.2.2.
(对于消费者) 在 ArrayBlockingQueue#take 中,它调用dequeue()
,而 又调用notFull.signal()
,这将结束 1.2.1 中的"旋转等待">
2.现在,回到你原来帖子的标题"BlockingQueue无法在同步的生产者/消费者方法中工作有什么意义?
2.1.
如果我接受这个问题的字面意思,那么答案可能是"除了在同步方法/块中使用它们之外,还有方便的 BlockingQueue 工具存在于 JAVA 中的原因",即它们当然可以存在于任何"同步"结构之外,并促进香草生产者/消费者的实现。
2.2.
但是,如果您打算进一步询问 -为什么 JAVA BlockQueue 实现不能在同步方法/块中轻松/良好/流畅地工作?
这将是一个不同的问题,一个有效而有趣的问题,我也顺便对此感到困惑。
具体来说,请参阅这篇文章以获取更多信息(请注意,在这篇文章中,消费者线程"挂起"是因为空队列和它拥有独占锁,而不是你的情况,生产者线程"挂起"是因为 FULL 队列和它拥有独占锁;但问题的核心应该是相同的)