具有可重入锁定条件的应用程序有时会挂起



我有以下代码片段:

public class ConditionTest {
    public static final ReentrantLock reentrantLock = new ReentrantLock();
    public static final Condition CONDITION_PRODUCED = reentrantLock.newCondition();
    public static final Condition CONDITION_RECEIVED = reentrantLock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        Thread receiverThread = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                reentrantLock.lock();
                try {
                    CONDITION_PRODUCED.await();
                    System.out.println("Received");
                    CONDITION_RECEIVED.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                reentrantLock.unlock();
            }
        });
        Thread senderThread = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                reentrantLock.lock();
                if (i != 0) {
                    try {
                        CONDITION_RECEIVED.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                System.out.println("Produced");
                CONDITION_PRODUCED.signal();
                reentrantLock.unlock();
            }
        });
        receiverThread.setName("received");
        senderThread.setName("Producer");
        receiverThread.start();
        Thread.sleep(500);
        senderThread.start();
    }
}

有时它可以正常工作,我看到预期的输出。但有时它会出错并在打印后挂起:

Produced
Received

线程转储:

2018-03-14 14:47:58
Full thread dump Java HotSpot(TM) 64-Bit Server VM (25.111-b14 mixed mode):
"JMX server connection timeout 18" #18 daemon prio=5 os_prio=0 tid=0x000000001e149800 nid=0x16ac in Object.wait() [0x000000001fb1f000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x000000076c76d070> (a [I)
    at com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(ServerCommunicatorAdmin.java:168)
    - locked <0x000000076c76d070> (a [I)
    at java.lang.Thread.run(Thread.java:745)
   Locked ownable synchronizers:
    - None
"RMI Scheduler(0)" #17 daemon prio=5 os_prio=0 tid=0x000000001e15d000 nid=0x2f3c waiting on condition [0x000000001fa1e000]
   java.lang.Thread.State: TIMED_WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076c438500> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2078)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1093)
    at java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:809)
    at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1067)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
   Locked ownable synchronizers:
    - None
"RMI TCP Connection(1)-192.168.56.1" #16 daemon prio=5 os_prio=0 tid=0x000000001e3b8800 nid=0x1674 runnable [0x000000001f91e000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:170)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    - locked <0x000000076c6f8d18> (a java.io.BufferedInputStream)
    at java.io.FilterInputStream.read(FilterInputStream.java:83)
    at sun.rmi.transport.tcp.TCPTransport.handleMessages(TCPTransport.java:550)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(TCPTransport.java:826)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.lambda$run$0(TCPTransport.java:683)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler$$Lambda$3/342486007.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(TCPTransport.java:682)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
   Locked ownable synchronizers:
    - <0x000000076c469b90> (a java.util.concurrent.ThreadPoolExecutor$Worker)
"RMI TCP Accept-0" #15 daemon prio=5 os_prio=0 tid=0x000000001e065800 nid=0x20e8 runnable [0x000000001f71f000]
   java.lang.Thread.State: RUNNABLE
    at java.net.DualStackPlainSocketImpl.accept0(Native Method)
    at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:131)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
    - locked <0x000000076c4402e8> (a java.net.SocksSocketImpl)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(LocalRMIServerSocketFactory.java:52)
    at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(TCPTransport.java:400)
    at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(TCPTransport.java:372)
    at java.lang.Thread.run(Thread.java:745)
   Locked ownable synchronizers:
    - None
"DestroyJavaVM" #13 prio=5 os_prio=0 tid=0x0000000002d1b000 nid=0x2c54 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
   Locked ownable synchronizers:
    - None
"Producer" #12 prio=5 os_prio=0 tid=0x000000001e3d3800 nid=0x24b0 waiting on condition [0x000000001ef1e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076b80c210> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at com.cryptex.fix.performance.ConditionTest.lambda$main$1(ConditionTest.java:32)
    at com.cryptex.fix.performance.ConditionTest$$Lambda$2/326549596.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)
   Locked ownable synchronizers:
    - None
"received" #11 prio=5 os_prio=0 tid=0x000000001e3d2000 nid=0x1a18 waiting on condition [0x000000001ee1e000]
   java.lang.Thread.State: WAITING (parking)
    at sun.misc.Unsafe.park(Native Method)
    - parking to wait for  <0x000000076b80c1f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)
    at com.cryptex.fix.performance.ConditionTest.lambda$main$0(ConditionTest.java:18)
    at com.cryptex.fix.performance.ConditionTest$$Lambda$1/1642360923.run(Unknown Source)
    at java.lang.Thread.run(Thread.java:745)
   Locked ownable synchronizers:
    - None
"Service Thread" #10 daemon prio=9 os_prio=0 tid=0x000000001e050800 nid=0x2a10 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
   Locked ownable synchronizers:
    - None
"C1 CompilerThread2" #9 daemon prio=9 os_prio=2 tid=0x000000001df1d800 nid=0x2ea8 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
   Locked ownable synchronizers:
    - None
"C2 CompilerThread1" #8 daemon prio=9 os_prio=2 tid=0x000000001df1d000 nid=0x136c waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
   Locked ownable synchronizers:
    - None
"C2 CompilerThread0" #7 daemon prio=9 os_prio=2 tid=0x000000001df1c000 nid=0x2a98 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
   Locked ownable synchronizers:
    - None
"Monitor Ctrl-Break" #6 daemon prio=5 os_prio=0 tid=0x000000001e023800 nid=0x2f58 runnable [0x000000001e81e000]
   java.lang.Thread.State: RUNNABLE
    at java.net.SocketInputStream.socketRead0(Native Method)
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
    at java.net.SocketInputStream.read(SocketInputStream.java:170)
    at java.net.SocketInputStream.read(SocketInputStream.java:141)
    at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
    at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
    at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
    - locked <0x000000076b91a460> (a java.io.InputStreamReader)
    at java.io.InputStreamReader.read(InputStreamReader.java:184)
    at java.io.BufferedReader.fill(BufferedReader.java:161)
    at java.io.BufferedReader.readLine(BufferedReader.java:324)
    - locked <0x000000076b91a460> (a java.io.InputStreamReader)
    at java.io.BufferedReader.readLine(BufferedReader.java:389)
    at com.intellij.rt.execution.application.AppMainV2$1.run(AppMainV2.java:64)
   Locked ownable synchronizers:
    - None
"Attach Listener" #5 daemon prio=5 os_prio=2 tid=0x000000001c4db800 nid=0x2f48 waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
   Locked ownable synchronizers:
    - None
"Signal Dispatcher" #4 daemon prio=9 os_prio=2 tid=0x000000001c4da000 nid=0x10f8 runnable [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE
   Locked ownable synchronizers:
    - None
"Finalizer" #3 daemon prio=8 os_prio=1 tid=0x000000001c4c0000 nid=0x228c in Object.wait() [0x000000001d82f000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x000000076b508e98> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:143)
    - locked <0x000000076b508e98> (a java.lang.ref.ReferenceQueue$Lock)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:164)
    at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:209)
   Locked ownable synchronizers:
    - None
"Reference Handler" #2 daemon prio=10 os_prio=2 tid=0x0000000002e07000 nid=0x1204 in Object.wait() [0x000000001d72f000]
   java.lang.Thread.State: WAITING (on object monitor)
    at java.lang.Object.wait(Native Method)
    - waiting on <0x000000076b506b40> (a java.lang.ref.Reference$Lock)
    at java.lang.Object.wait(Object.java:502)
    at java.lang.ref.Reference.tryHandlePending(Reference.java:191)
    - locked <0x000000076b506b40> (a java.lang.ref.Reference$Lock)
    at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:153)
   Locked ownable synchronizers:
    - None
"VM Thread" os_prio=2 tid=0x000000001c497800 nid=0x2784 runnable 
"GC task thread#0 (ParallelGC)" os_prio=0 tid=0x0000000002d2e800 nid=0x848 runnable 
"GC task thread#1 (ParallelGC)" os_prio=0 tid=0x0000000002d30800 nid=0x20cc runnable 
"GC task thread#2 (ParallelGC)" os_prio=0 tid=0x0000000002d32000 nid=0x1c5c runnable 
"GC task thread#3 (ParallelGC)" os_prio=0 tid=0x0000000002d33800 nid=0x20bc runnable 
"VM Periodic Task Thread" os_prio=2 tid=0x000000001e13d000 nid=0x2dc waiting on condition 
JNI global references: 350

我做错了什么?

来自 Java 并发实践

条件等待错误。在等待条件队列时,应在循环中调用 Object.wait 或 Condition.await,并按住相应的锁, 在测试了一些状态谓词之后(见第14章(。调用对象等待或 Condition.await 没有锁,没有在循环中,或者没有测试 某些状态谓词几乎可以肯定是一个错误。

由于您不这样做,您可能会遇到信号丢失,或者不太可能但也可能是虚假唤醒,这会中断两个线程之间的同步。

可能的更正是

public class ConditionTest {
    public static final ReentrantLock reentrantLock = new ReentrantLock();
    public static final Condition CONDITION_PRODUCED = reentrantLock.newCondition();
    public static final Condition CONDITION_RECEIVED = reentrantLock.newCondition();
    private static boolean state = true;
    public static void main(String[] args) throws InterruptedException {
        Thread receiverThread = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    reentrantLock.lock();
                    while (state) {
                        CONDITION_PRODUCED.await();
                    }
                    state = true;
                    System.out.println("Received");
                    CONDITION_RECEIVED.signal();
                    reentrantLock.unlock();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        Thread senderThread = new Thread(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    reentrantLock.lock();
                    while (!state) {
                        CONDITION_RECEIVED.await();
                    }
                    state = false;
                    System.out.println("Produced");
                    CONDITION_PRODUCED.signal();
                    reentrantLock.unlock();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
        receiverThread.setName("received");
        senderThread.setName("Producer");
        receiverThread.start();
        senderThread.start();
    }
}

当生产者线程为条件生成信号时,CONDITION_RECEIVED在使用者线程开始等待此条件的信号之前,就会出现问题 (CONDITION_PRODUCED.await() (。在这种情况下,信号"丢失",两个线程最终相互等待。

您可以使用两个线程共享的boolean标志来处理这种情况,但我不建议这样做,因为它容易出错,难以阅读和调试,并且难以扩展,因为生产者和消费者将紧密耦合。

通常的生产者/消费者模式涉及队列。它们为您提供了更易于理解、更松散耦合和可扩展的设计。

BlockingQueue<Integer> produced = new LinkedBlockingQueue<>();
BlockingQueue<Integer> requests = new LinkedBlockingQueue<>();
Thread receiverThread = new Thread(() -> {
    int i = 1;
    requests.offer(i);
    while (i < 10) {
        try {
            produced = queue.take();
            System.out.println("Received: " + i);
            requests.offer(i + 1);
        } catch (InterruptedException e) {
            // Something which does not swallow the interruption
        }
    }
});
Thread senderThread = new Thread(() -> {
    while (true) {
        try {
            int i = requests.take();
            System.out.println("Produced: " + i);
            produced.offer(i);
        } catch (InterruptedException e) {
            // Something which does not swallow the interruption
        }
    }
});

永远不要在没有循环的情况下调用wait()/await()

@bowmore的解决方案不遵循生产者/消费者模式的典型顺序。

public class ConditionTest {
    public static final ReentrantLock reentrantLock = new ReentrantLock();
    public static final Condition CONDITION_PRODUCED = reentrantLock.newCondition();
    public static final Condition CONDITION_RECEIVED = reentrantLock.newCondition();
    private static boolean dataAvailale = false;
    public static void main(String[] args) throws InterruptedException {
        Thread receiverThread = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                reentrantLock.lock();
                try {
                    while (!dataAvailale){
                        CONDITION_PRODUCED.await();
                    }
                    dataAvailale = false;
                    System.out.println("Received " + i);
                    CONDITION_RECEIVED.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                reentrantLock.unlock();
            }
        });
        Thread senderThread = new Thread(() -> {
            for (int i = 0; i < 100; i++) {
                reentrantLock.lock();
                while (dataAvailale) {
                    try {
                        CONDITION_RECEIVED.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                dataAvailale = true;
                System.out.println("Produced " + i);
                CONDITION_PRODUCED.signal();
                reentrantLock.unlock();
            }
        });
        receiverThread.setName("received");
        senderThread.setName("Producer");
        receiverThread.start();
        Thread.sleep(50);
        senderThread.start();
    }
}

最新更新