我正在尝试线程停放,并决定构建某种服务。下面是它的外观:
public class TestService {
private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles
private final CountDownLatch stopLatch;
private final Object parkBlocker = new Object();
private volatile boolean stopped;
private final Thread[] workers;
public TestService(int parallelizm) {
stopLatch = new CountDownLatch(parallelizm);
workers = new Thread[parallelizm];
for (int i = 0; i < parallelizm; i++) {
workers[i] = new Thread(() -> {
try {
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
LockSupport.park(parkBlocker);
logger.debug(Thread.currentThread().getName() + " unparked");
}
} finally {
stopLatch.countDown();
}
});
}
}
public void start() {
Arrays.stream(workers).forEach(t -> {
t.start();
logger.debug(t.getName() + " started");
});
}
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
this.stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
}
return stoppedSuccefully;
}
private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
}
}
我面临的问题是,如果我按如下方式测试此服务:
public static void main(String[] args) = {
while(true) {
TestService service = new TestService(2);
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS))
throw new RuntimeException();
}
}
我有时会遇到以下行为:
14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
at com.pack.age.Test$.main(Test.scala:12)
at com.pack.age.Test.main(Test.scala)
线程在停车场闲逛:
"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000720739a68> (a java.lang.Object)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at com.pack.age.TestService.lambda$new$0(TestService.java:27)
at com.pack.age.TestService$$Lambda$1/1327763628.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)
我在服务中没有看到任何公园取消停车的比赛。此外,如果在park
之前调用unpark
,则保证park
不会阻塞(这就是javadocs所说的(。
也许我误用了LockSupport::park
.你能提出任何解决方案吗?
这与记录器无关,尽管它的使用使问题浮出水面。您有一个竞争条件,就这么简单。在解释该争用条件之前,您需要先了解LockSupport::unpark
文档中的一些内容:
为给定线程提供许可证(如果尚不可用(。 如果线程在寄存时被阻塞,那么它将取消阻塞。否则,保证其下一次停车调用不会阻止。
第一点在这里解释。简短的版本是:如果你有一个已经启动但尚未调用park
thread
,并且在这段时间内(线程start
和park
之间(,其他一些线程调用unpark
第一个线程:该线程根本不会停放。许可证将立即可用。也许这张小图就能更清楚:
(ThreadA) start ------------------ park --------- ....
(ThreadB) start ----- unpark -----
请注意ThreadB
调用在ThreadA
调用start
和park
之间的unpark(ThreadA)
。因此,当ThreadA
达到park
:保证不会阻止,就像文档所说的那样。
同一文档中的第二点是:
如果给定线程尚未启动,则不保证此操作有任何效果。
让我们通过图纸看看:
Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park
ThreadA
呼叫后park
,它将永远挂起,因为ThreadB
再也不会调用unpark
。请注意,对unpark
的调用是在ThreadA
开始之前进行的(与前面的示例不同(。
这正是您的情况下发生的情况:
LockSupport.unpark(w);
(从unparkWorkers
开始(在从public void start(){...}
t.start();
之前调用。简而言之 - 您的代码在两个workers
启动之前就调用了unpark
,因此当它们最终到达park
时 - 它们被卡住了,没有人能够unpark
它们。事实上,你用logger
而不是System::out
看到这一点,很可能与你println
时的脸有关 - 引擎盖下有一种synchronized
的方法。
事实上,LockSupport
提供了证明这一点所需的语义。为此,我们需要(为简单起见:SOProblem service = new SOProblem(1);
(
static class ParkBlocker {
private volatile int x;
public ParkBlocker(int x) {
this.x = x;
}
public int getX() {
return x;
}
}
现在我们需要以适当的方法插入它。首先标记我们称之为unpark
的事实:
private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
/*
* add "1" to whatever there is already in pb.x, meaning
* we have done unparking _also_
*/
int y = pb.x;
y = y + 1;
pb.x = y;
}
然后在循环结束后重置标志:
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
// reset the flag
pb.x = 0;
}
return stoppedSuccefully;
}
然后将构造函数更改为标记线程已启动:
.....
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
// flag the fact that thread has started. add "2", meaning
// thread has started
int y = pb.x;
y = y + 2;
pb.x = y;
LockSupport.park(pb);
logger.debug(Thread.currentThread().getName() + " unparked");
}
然后,当您的线程冻结时,您需要检查标志:
public static void main(String[] args) throws InterruptedException {
while (true) {
SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
service.debug();
throw new RuntimeException();
}
}
}
其中debug
方法是:
public void debug() {
Arrays.stream(workers)
.forEach(x -> {
ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
if (pb != null) {
System.out.println("x = " + pb.getX());
}
});
}
当问题再次出现时,您在调用park
之前已经调用了unpark
,这发生在x = 3
作为输出时。