java并发:基于BlockingQueue的生产者/消费者似乎不能很好地处理需要同步代码块的复合操作



当我试图用基于BlockingQueue的生产者/消费者模式实现一些复合操作时,我感到很惊讶,这让我觉得我很可能错过了一些明显的东西。


1.简而言之

我需要

  • 我的使用者以"从队列中获取obj+在obj上执行更多使用者操作"原子和
  • 我的生产者以"向队列提供obj"的形式进行序列操作,并在obj原子和
  • 显然,上述两个原子序列在同一物体上同步

如果没有这样的原子性,可能会出现问题,请参阅">问题!!">'作为下面第2节中生产者代码注释中的一个示例。

但我不能简单地在take()的调用及其相关的消费者操作周围放置一个同步块,当队列为空时,这个消费者将永远被困在那里,因为当它等待生产者用obj填充队列时,它仍然拥有同步锁,消费者的同步锁占有将反过来阻止生产者进入相应的关键区域进行任何"生产"。

2.特别地,简化的示例代码如下:

生产者和消费者类已知的通用代码:

Queue<QObj> nbq = new ConcurrentLinkedQueue();
BlockingQueue<QObj> bq = new LinkedBlockingQueue<>();
List<String> idList = new LinkedList<>();
Object lockObj = idList;
int Idx = 1;
public static class QObj {
public String id;
public String content;
public QObj(String id, String content) {
this.id = id;
this.content = content;
}
}

生产者类中的主要逻辑:

public void produceBlocking() {
QObj o = new QObj(String.valueOf(Idx), "Content_" + Idx++);
//        synchronized(lockObj) {
//        no point to include Queue.offer(...) call in a synchronized block as we
//        won't be able to use synchronized() in corresponding consumer anyway
//        for the reason described above
bq.offer(o);
synchronized (lockObj) {
// PROBLEM!! by now, 'o' could have been 'consumed' already
//  hence we shouldn't do the following operations:
// do the associated part of compound action of 'producer'
idList.add(o.id);
// do some more operation as part of this compound action ...
}
//        }
}

消费者类中的主要逻辑:

public void consumeBlocking() {
while (true) {
try {
//                synchronized (lockObj) {
//                can't simply put synchronized() here to make the following compound action atomic
//                  - when the queue is empty, this consumer will be stuck here forever since it still possesses
//                      the lockObj, which stops the producer from entering the critical region to do any 'producing'
QObj o = bq.take();
synchronized (lockObj) {
// do the associated part of compound action of 'consumer'
idList.remove(o.id);
// do some more operation as part of this compound action ...
}
//                }
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

3.为什么这不是一个常见的问题

我觉得当人们使用BlockingQueue时,这一定是一个常见的问题,而我无法真正找到任何直接解决类似问题的东西,这一事实证实了我的信念,即我可能有根本性的错误。

有人能给我一些直接解决方案的提示吗?或者指出我对这个问题的错误看法吗?

4.备选方案

我确实想过一些替代方案,但我觉得没有一个直接解决这个问题,而且都有一些缺点(如代码评论中突出显示的">DRAWBACK!!")

4.1-在继续之前,请使用Queue.contains()进行检查

public void produceBlockingWithCheck() {
QObj o = new QObj(String.valueOf(Idx), "Content_" + Idx++);
bq.offer(o);
synchronized (lockObj) {
// First, Check if the obj could have already been consumed
// DRAWBACK!!: this could be very costly, e.g.
//  when 'bq' is a LinkedBlockingQueue, and contains(...) always triggers
//  a sequential traversal, the Queue itself can be very large
if (bq.contains(o)) {
// do the associated part of compound action of 'producer'
idList.add(o.id);
// do some more operation as part of this compound action ...
}
}
}

4.2-调整生产者上的操作顺序,将Queue.offer()调用移动到最后的

public void produceBlockingOrderAdjusted() {
QObj o = new QObj(String.valueOf(Idx), "Content_" + Idx++);
// do the associated part of compound action of 'producer', only before
// calling BlockingQueue.offer(...)
//  DRAWBACK!!: even this may work for this simple case, such order adjustment
//  won't not be logically possible for all cases, will it?
synchronized (lockObj) {
idList.add(o.id);
// do some more operation as part of this compound action ...
}
bq.offer(o);
}

4.3-请改用非阻塞队列。


public void produceNonBlocking() {
QObj o = new QObj(String.valueOf(Idx), "Content_" + Idx++);
synchronized(lockObj) {
nbq.offer(o);
// do the associated part of compound action of 'producer'
idList.add(o.id);
// do some more operation as part of this compound action ...
}
}
public void consumeNonBlocking() {
while (true) {
synchronized (lockObj) {
// kind of doing our own blocking.
QObj o = nbq.poll();
if (o != null) {
// do the associated part of compound action of 'consumer'
idList.add(o.id);
// do some more operation as part of this compound action ...
}
// DRAWBACK!!: if the 'producers' don't produce faster than the 'consumers' consuming,
// this 'miss' could be happening too often and get costly
}
}
}

为什么这不是一个常见的问题?

多线程就像旧的棋盘游戏,"奥赛罗;其与标签线一起销售;学习一分钟,掌握一生"现代线程库使编写多线程代码变得容易,但设计有效使用多线程的算法并不容易有时,作为高效单线程算法基础的相同设计原则可能完全不适合在多线程代码中使用。

一个有经验的设计者知道,当线程A将某个对象放入队列中时;消耗的";对于线程B,最好让线程A永远处理该对象。只需将对象从队列中取出就足以让线程B独占使用它。如果你不能在不增加设计复杂性的情况下做到这一点,。。。好吧,这就是使用多线程所要付出的代价。

如果在八核机器上运行,那么效率只有单线程实现一半的多线程并行计算仍然可以以四倍的速度运行。

我需要

  • 我的使用者以"从队列中获取obj+在obj上执行更多使用者操作"原子和
  • 我的生产者以"向队列提供obj"的形式进行序列操作,并在obj原子和
  • 显然,上述两个原子序列在同一物体上同步

您可以使用wait+notifyAll
试着阅读这篇文章:对CCD_ 3进行了详细的说明。

但我不能简单地在take()的调用及其相关的消费者操作周围放置一个同步块,因为当队列为空时,这个消费者将永远被困在那里,因为它在等待生产者用obj填充队列时仍将拥有同步锁,消费者的同步锁占有将反过来阻止生产者进入相应的关键区域进行任何"生产"。

wait+notifyAll解决了这个问题,因为在wait()内部等待的线程释放了锁(稍后当wait()需要返回时,线程再次获得锁)。

您还可以查看Condition javadocs
Conditionwait+notify的概念相同,但用于Lock接口(synchronized的更灵活、更强大的版本)
再看一下javadocs中的BoundedBuffer示例,它似乎可以修改为在代码中执行您想要的操作。