当我偶然发现一些我不理解的行为时,我试图使用 Java 同步"原语"(syncd, wait(), notify()) 实现类似于 Java 的有界 BlockingQueue 接口的东西。
我创建了一个能够存储 1 个元素的队列,创建两个等待从队列中获取值的线程,启动它们,然后尝试将两个值放入主线程同步块中的队列中。大多数情况下它可以工作,但有时等待值的两个线程开始似乎相互唤醒并且不让主线程进入同步块。
这是我的(简化)代码:
import java.util.LinkedList;
import java.util.Queue;
public class LivelockDemo {
private static final int MANY_RUNS = 10000;
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < MANY_RUNS; i++) { // to increase the probability
final MyBoundedBlockingQueue ctr = new MyBoundedBlockingQueue(1);
Thread t1 = createObserver(ctr, i + ":1");
Thread t2 = createObserver(ctr, i + ":2");
t1.start();
t2.start();
System.out.println(i + ":0 ready to enter synchronized block");
synchronized (ctr) {
System.out.println(i + ":0 entered synchronized block");
ctr.addWhenHasSpace("hello");
ctr.addWhenHasSpace("world");
}
t1.join();
t2.join();
System.out.println();
}
}
public static class MyBoundedBlockingQueue {
private Queue<Object> lst = new LinkedList<Object>();;
private int limit;
private MyBoundedBlockingQueue(int limit) {
this.limit = limit;
}
public synchronized void addWhenHasSpace(Object obj) throws InterruptedException {
boolean printed = false;
while (lst.size() >= limit) {
printed = __heartbeat(':', printed);
notify();
wait();
}
lst.offer(obj);
notify();
}
// waits until something has been set and then returns it
public synchronized Object getWhenNotEmpty() throws InterruptedException {
boolean printed = false;
while (lst.isEmpty()) {
printed = __heartbeat('.', printed); // show progress
notify();
wait();
}
Object result = lst.poll();
notify();
return result;
}
// just to show progress of waiting threads in a reasonable manner
private static boolean __heartbeat(char c, boolean printed) {
long now = System.currentTimeMillis();
if (now % 1000 == 0) {
System.out.print(c);
printed = true;
} else if (printed) {
System.out.println();
printed = false;
}
return printed;
}
}
private static Thread createObserver(final MyBoundedBlockingQueue ctr,
final String name) {
return new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println(name + ": saw " + ctr.getWhenNotEmpty());
} catch (InterruptedException e) {
e.printStackTrace(System.err);
}
}
}, name);
}
}
这是我在它"阻止"时看到的:
(skipped a lot)
85:0 ready to enter synchronized block
85:0 entered synchronized block
85:2: saw hello
85:1: saw world
86:0 ready to enter synchronized block
86:0 entered synchronized block
86:2: saw hello
86:1: saw world
87:0 ready to enter synchronized block
............................................
..........................................................................
..................................................................................
(goes "forever")
但是,如果我更改addWhenHasSpace和getWhenNotEmpty方法的while(...)循环中的notify()调用来通知All(),它"总是"通过。
我的问题是:为什么在这种情况下,notify() 和 notifyAll() 方法之间的行为会有所不同,以及为什么 notify() 的行为是这样的?
我希望这两种方法在这种情况下都以相同的方式运行(两个线程等待,一个线程阻塞),因为:
- 在我看来,在这种情况下,通知All()只会唤醒另一个线程,与notify()相同; 看起来唤醒线程
- 的方法的选择会影响被唤醒的线程(我猜是可运行的)和主线程(已被阻止)后来如何争夺锁——这不是我期望从 javadoc 以及搜索互联网上关于这个主题的东西。
或者也许我完全做错了什么?
您的代码的情况下,我可以看到您正在使用单个条件变量来实现具有一个生产者和多个使用者的队列。 这是一个麻烦的秘诀:如果只有一个条件变量,那么当一个消费者调用notify()
时,没有办法知道它会唤醒生产者还是唤醒另一个消费者。
方法可以摆脱这个陷阱:最简单的方法是始终使用notifyAll().
另一种方法是停止使用 synchronized
、wait()
和 notify()
,而是使用 java.util.concurrent.locks 中的工具。
单个 ReentrantLock 对象可以为您提供两个(或更多)条件变量。 一个专用于生产者通知消费者,另一个专门用于消费者通知生产者。
注意:切换到使用重入锁时,名称会发生变化:o.wait()
变为c.await()
,o.notify()
变为c.signal()
。
似乎有某种公平/驳船正在使用内在锁定 - 可能是由于一些优化。 我猜,本机代码会检查当前线程是否已通知它即将等待的监视器并允许它获胜。
将synchronized
替换为ReentrantLock
,它应该按预期工作。这里的不同之处在于ReentrantLock
如何处理它已通知的锁的服务员。
更新:
有趣的发现在这里。 你所看到的是进入main
线程之间的竞赛
synchronized (ctr) {
System.out.println(i + ":0 entered synchronized block");
ctr.addWhenHasSpace("hello");
ctr.addWhenHasSpace("world");
}
而其他两个线程进入各自的synchronized
区域。 如果主线程在两者中的至少一个之前没有进入其同步区域,您将体验到您所描述的此实时锁定输出。
似乎正在发生的事情是,如果两个消费者线程都先点击同步块,它们将相互乒乓球notify
和wait
。 可能是这样一种情况,JVM 在线程被阻塞时将等待的线程优先权提供给监视器。