BlockingQueue 无法在同步的生产者/消费者方法中工作有什么意义?



当我第一次读到接口 BlockingQueue 时,我读到:如果队列中没有更多空间,生产者会阻止队列中的任何更多 put()调用。相反,如果没有要获取的项目,它会阻止方法take()。我认为它在内部的工作方式与wait()和notify()相同。例如,当内部没有更多元素要读取时,将调用 wait(),直到 Producer 再添加一个元素并调用 notify()。或者这就是我们在"旧的生产者/消费者模式"中所做的。但它在阻塞队列中不是这样工作的。如何?有什么意义?老实说,我很惊讶!

我将演示:

public class Testing {
BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(3);
synchronized void write() throws InterruptedException {
for (int i = 0; i < 6; i++) {
blockingQueue.put(i);
System.out.println("Added " + i);
Thread.sleep(1000);
}
}
synchronized void read() throws InterruptedException {
for (int i = 0; i < 6; i++) {
System.out.println("Took: " + blockingQueue.take());
Thread.sleep(3000);
}
}
}
class Test1 {
public static void main(String[] args) {
Testing testing = new Testing();
new Thread(new Runnable() {
@Override
public void run() {
try {
testing.write();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
testing.read();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

输出:

Added 0
Added 1
Added 2

"程序挂起"。

我的问题是,如果 take() 和 put() BLOCK 内部不使用 wait() 或 notify(),它们如何阻止?他们是否有一些快速燃烧 CPU 圈的 while 循环?坦率地说,我很困惑。

这是ArrayBlockingQueue#put的当前实现:

/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

您将看到,它不是使用wait()notify(),而是调用notFull.await();notFullCondition

Condition的文档如下:

条件将对象监视器方法(等待、通知和通知全部)分解为不同的对象,通过将它们与任意 Lock 实现相结合,为每个对象提供多个等待集的效果。Lock 取代了同步方法和语句的使用,条件取代了对象监视器方法的使用。

如果你通过下面的代码,你会知道生产者/消费者的问题将如何使用BlokingQueue接口得到解决。

在这里,您可以看到生产者和消费者共享了相同的队列。

从主类开始,您将同时启动线程生产者和消费者。

class Producer implements Runnable {
protected BlockingQueue blockingQueue = null;
public Producer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 6; i++) {
try {
blockingQueue.put(i);
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Added " + i);
}
}
}
class Consumer implements Runnable {
protected BlockingQueue blockingQueue = null;
public Consumer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public void run() {
for (int i = 0; i < 6; i++) {
try {
System.out.println("Took: " + blockingQueue.take());
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Test1 {
public static void main(String[] args) throws InterruptedException {
BlockingQueue queue = new ArrayBlockingQueue(3);
Producer producer = new Producer(queue);
Consumer consumer = new Consumer(queue);
new Thread(producer).start();
new Thread(consumer).start();
Thread.sleep(4000);
}
}

此代码将打印输出,例如

Took: 0
Added 0
Added 1
Added 2
Took: 1
Added 3
Added 4
Took: 2
Added 5
Took: 3
Took: 4
Took: 5

(我确信我的答案的某些或全部部分可能是您已经理解的内容,在这种情况下,请将其视为澄清:))。

1. 为什么使用阻塞队列的代码示例会出现"程序挂起"?

1.1 从概念上讲
首先,如果我们能省略一下实现级别的细节,如'wait()'、'notify()'等,从概念上讲,BlockingQueue 的 JAVA 中的所有实现都按照规范工作,即就像你说的:

'生产者阻止队列中任何更多的 put() 调用,如果它没有更多 空间。相反,如果没有,它会阻止方法 take() 要带走的物品。

因此,从概念上讲,代码示例挂起的原因是因为

1.1.1.
调用(同步)write() 的线程首先单独运行,直到 'testing.write()' 在此线程中返回,调用(同步)read() 的第二个线程将有机会运行 — 这是同一对象中"同步"方法的本质。

1.1.2.
现在,在您的示例中,从概念上讲,"testing.write()"永远不会返回,在该 for 循环中,它会将前 3 个元素"放入"队列,然后有点"旋转等待"第二个线程消耗/"获取"其中一些元素,以便它可以"放置"更多,但由于 1.1.1 中的上述原因,这永远不会发生

1.2 以编程方式

1.2.1.
(对于生产者) 在 ArrayBlockingQueue#put 中,我在 1.1.2 中提到的"旋转等待"的形式是

while (count == items.length) notFull.await();

1.2.2.
(对于消费者) 在 ArrayBlockingQueue#take 中,它调用dequeue(),而 又调用notFull.signal(),这将结束 1.2.1 中的"旋转等待">

2.现在,回到你原来帖子的标题"BlockingQueue无法在同步的生产者/消费者方法中工作有什么意义?

2.1.
如果我接受这个问题的字面意思,那么答案可能是"除了在同步方法/块中使用它们之外,还有方便的 BlockingQueue 工具存在于 JAVA 中的原因",即它们当然可以存在于任何"同步"结构之外,并促进香草生产者/消费者的实现。

2.2.
但是,如果您打算进一步询问 -为什么 JAVA BlockQueue 实现不能在同步方法/块中轻松/良好/流畅地工作?

这将是一个不同的问题,一个有效而有趣的问题,我也顺便对此感到困惑。

具体来说,请参阅这篇文章以获取更多信息(请注意,在这篇文章中,消费者线程"挂起"是因为空队列和它拥有独占锁,而不是你的情况,生产者线程"挂起"是因为 FULL 队列和它拥有独占锁;但问题的核心应该是相同的)

最新更新