服务中的停车线程



我正在尝试线程停放,并决定构建某种服务。下面是它的外观:

public class TestService {
private static final Logger logger = LoggerFactory.getLogger(TestService.class); // logback I think this logger causes some troubles
private final CountDownLatch stopLatch;
private final Object parkBlocker = new Object();
private volatile boolean stopped;
private final Thread[] workers;
public TestService(int parallelizm) {
stopLatch = new CountDownLatch(parallelizm);
workers = new Thread[parallelizm];
for (int i = 0; i < parallelizm; i++) {
workers[i] = new Thread(() -> {
try {
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
LockSupport.park(parkBlocker);
logger.debug(Thread.currentThread().getName() + " unparked");
}
} finally {
stopLatch.countDown();
}
});
}
}
public void start() {
Arrays.stream(workers).forEach(t -> {
t.start();
logger.debug(t.getName() + " started");
});
}
public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
this.stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
}
return stoppedSuccefully;
}
private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
}
}

我面临的问题是,如果我按如下方式测试此服务:

public static void main(String[] args) = {
while(true) {
TestService service = new TestService(2);
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS))
throw new RuntimeException();
}
}

我有时会遇到以下行为:

14:58:55.226 [main] DEBUG com.pack.age.TestService - Thread-648 started
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Parking Thread-648
14:58:55.227 [main] DEBUG com.pack.age.TestService - Thread-649 started
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-648
14:58:55.227 [Thread-648] DEBUG com.pack.age.TestService - Thread-648 unparked
14:58:55.227 [main] DEBUG com.pack.age.TestService - Un-park call is done on Thread-649
14:58:55.227 [Thread-649] DEBUG com.pack.age.TestService - Parking Thread-649
Exception in thread "main" java.lang.RuntimeException
at com.pack.age.Test$.main(Test.scala:12)
at com.pack.age.Test.main(Test.scala)

线程在停车场闲逛:

"Thread-649" #659 prio=5 os_prio=0 tid=0x00007efe4433f000 nid=0x7691 waiting on condition [0x00007efe211c8000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for  <0x0000000720739a68> (a java.lang.Object)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
at com.pack.age.TestService.lambda$new$0(TestService.java:27)
at com.pack.age.TestService$$Lambda$1/1327763628.run(Unknown Source)
at java.lang.Thread.run(Thread.java:748)

我在服务中没有看到任何公园取消停车的比赛。此外,如果在park之前调用unpark,则保证park不会阻塞(这就是javadocs所说的(。

也许我误用了LockSupport::park.你能提出任何解决方案吗?

这与记录器无关,尽管它的使用使问题浮出水面。您有一个竞争条件,就这么简单。在解释该争用条件之前,您需要先了解LockSupport::unpark文档中的一些内容:

为给定线程提供许可证(如果尚不可用(。 如果线程在寄存时被阻塞,那么它将取消阻塞。否则,保证其下一次停车调用不会阻止。

第一点在这里解释。简短的版本是:如果你有一个已经启动但尚未调用parkthread,并且在这段时间内(线程startpark之间(,其他一些线程调用unpark第一个线程:该线程根本不会停放。许可证将立即可用。也许这张小图就能更清楚:

(ThreadA)  start ------------------ park --------- ....
(ThreadB)  start ----- unpark -----

请注意ThreadB调用在ThreadA调用startpark之间的unpark(ThreadA)。因此,当ThreadA达到park保证不会阻止,就像文档所说的那样。

同一文档中的第二点是:

如果给定线程尚未启动,则不保证此操作有任何效果。

让我们通过图纸看看:

Thread B calls unpark(ThreadA) --- Thread A starts --- Thread A calls park 

ThreadA呼叫后park,它将永远挂起,因为ThreadB再也不会调用unpark。请注意,对unpark的调用是在ThreadA开始之前进行的(与前面的示例不同(。

这正是您的情况下发生的情况:

LockSupport.unpark(w);(从unparkWorkers开始(在从public void start(){...}t.start();之前调用。简而言之 - 您的代码在两个workers启动之前就调用了unpark,因此当它们最终到达park时 - 它们被卡住了,没有人能够unpark它们。事实上,你用logger而不是System::out看到这一点,很可能与你println时的脸有关 - 引擎盖下有一种synchronized的方法。


事实上,LockSupport提供了证明这一点所需的语义。为此,我们需要(为简单起见:SOProblem service = new SOProblem(1);(

static class ParkBlocker {
private volatile int x;
public ParkBlocker(int x) {
this.x = x;
}
public int getX() {
return x;
}
}

现在我们需要以适当的方法插入它。首先标记我们称之为unpark的事实:

private void unparkWorkers() {
Arrays.stream(workers).forEach(w -> {
LockSupport.unpark(w);
logger.debug("Un-park call is done on " + w.getName());
});
/*
* add "1" to whatever there is already in pb.x, meaning
* we have done unparking _also_
*/
int y = pb.x;
y = y + 1;
pb.x = y;
}

然后在循环结束后重置标志:

public boolean stop(long timeout, TimeUnit unit) throws InterruptedException {
boolean stoppedSuccefully = false;
stopped = true;
unparkWorkers();
if (stopLatch.await(timeout, unit)) {
stoppedSuccefully = true;
// reset the flag
pb.x = 0;
}
return stoppedSuccefully;
}

然后将构造函数更改为标记线程已启动:

.....
while (!stopped) {
logger.debug("Parking " + Thread.currentThread().getName());
// flag the fact that thread has started. add "2", meaning
// thread has started
int y = pb.x;
y = y + 2;
pb.x = y;
LockSupport.park(pb);
logger.debug(Thread.currentThread().getName() + " unparked");
}

然后,当您的线程冻结时,您需要检查标志:

public static void main(String[] args) throws InterruptedException {
while (true) {
SOProblem service = new SOProblem(1); // <-- notice a single worker, for simplicity
service.start();
if (!service.stop(10000, TimeUnit.MILLISECONDS)) {
service.debug();
throw new RuntimeException();
}
}
}

其中debug方法是:

public void debug() {
Arrays.stream(workers)
.forEach(x -> {
ParkBlocker pb = (ParkBlocker) LockSupport.getBlocker(x);
if (pb != null) {
System.out.println("x = " + pb.getX());
}
});
}

当问题再次出现时,您在调用park之前已经调用了unpark,这发生在x = 3作为输出时。

相关内容

  • 没有找到相关文章

最新更新