引用文档:
"BlockingQueue实现是线程安全的。所有排队方法都使用内部锁或其他形式的并发控制以原子方式实现其效果。但是,批量收集操作 addAll、containsAll、retainAll 和 removeAll 不一定以原子方式执行,除非在实现中另有指定。因此,例如,addAll(c) 在仅添加 c 中的某些元素后可能会失败(抛出异常)。
由于在LinkedBlockingQueue.addAll()
操作的描述中没有特别写任何内容,因此我必须假设这不是线程安全的。
您是否同意我的观点,即为了保证通过 addAll() 添加的所有元素都是连续的(即加在一起)的唯一解决方案是在每次修改队列时使用Lock
(使用 add
或 take
操作)?例如:
BlockingQueue<T> queue = new LinkedBlockingQueue<>();
Lock lock = new ReentrantLock();
//somewhere, some thread...
lock.lock();
queue.addAll(someCollection);
lock.unlock();
//somewhere else, (maybe) some other thread...
lock.lock();
queue.take();
lock.unlock();
重要更新:哇,没有人在前面的例子中看到一个大错误:由于take()
是一个阻塞操作,并且由于需要锁才能将元素添加到队列中,一旦队列为空,程序就会进入死锁状态:由于锁被take()
拥有,因此写入器无法写入, 同时,take()
将处于阻止状态,直到队列中写入某些内容(由于前面的原因,这不会发生)。知道吗?我认为最明显的一个是移除take()
周围的锁,但也许不能保证所需的addAll()
原子性。
addAll仍然是线程安全的,只是不提供原子性。因此,这取决于您的用例/期望是什么。
如果您在没有显式锁定的情况下使用 addAll,那么如果其他线程尝试写入队列(添加新元素),则无法保证添加元素的顺序,并且它们可能会混合在一起。如果这是一个问题,那么是的,您需要锁定。但是 addAll 仍然是线程安全的,队列不会损坏。
但通常情况下,队列用于在许多读取器/编写器之间提供通信方式,并且可能不需要严格保留插入顺序。
现在,主要问题是,如果队列已满,add方法会引发异常,因此addAll操作可能会在中间折叠,而您不知道添加了哪些元素,哪些元素没有添加。
如果您的用例允许等待空格来插入元素,那么您应该使用 put in a 循环。
for (E e: someCollection) queue.put(e);
这将阻塞,直到有空间添加另一个元素。
手动锁定很棘手,因为您始终必须记住在访问队列时添加锁定,这很容易出错。因此,如果您确实需要原子性,请编写一个包装类,该类实现 BlockingQUeue 接口,但在调用底层操作之前使用锁定。
我相信您将线程安全与原子混淆了。据我了解,批量操作是线程安全的,尽管不是原子的。
我认为您不需要使用外部ReentrantLock
来使您的BlockingQueue
线程安全。实际上,addAll()
是通过迭代给定Collection
并在集合的每个元素的队列上调用add()
来实现的。由于add()
是线程安全的,因此您无需同步任何内容。
当javadocs这样说时:
因此,例如,addAll(c) 在仅添加 c 中的某些元素后可能会失败(抛出异常)。
这意味着当addAll(c)
返回时,可能只添加了给定集合c
的某些元素。但是,这并不意味着您需要锁定队列才能调用take()
或任何其他操作。
编辑:
根据您的用例,您可以按照您的建议使用锁,尽管我会将其作为BlockingQueue
实现的内部,以便调用方类不需要遵守锁/call_some_method_from_the_queue/解锁模式。使用锁时我会更加小心,即在try/finally
块中使用它:
public class MyQueue<T> extends LinkedBlockingQueue<T> {
private final Lock lock = new ReentrantLock();
@Override
public boolean addAll(Collection<T> c) {
boolean r = false;
try {
this.lock.lock();
r = super.addAll(c);
} finally {
this.lock.unlock();
}
return r;
}
@Override
public void add(T e) {
try {
this.lock.lock();
super.add(e);
} finally {
this.lock.unlock();
}
}
// You don't need to lock on take(), since
// it preserves the order in which elements
// are inserted and is already thread-safe
}