我有这样的代码:
public class RecursiveQueue {
//@Inject
private QueueService queueService;
public static void main(String[] args) {
RecursiveQueue test = new RecursiveQueue();
test.enqueue(new Node("X"), true);
test.enqueue(new Node("Y"), false);
test.enqueue(new Node("Z"), false);
}
private void enqueue(final Node node, final boolean waitTillFinished) {
final AtomicLong totalDuration = new AtomicLong(0L);
final AtomicInteger counter = new AtomicInteger(0);
AfterCallback callback= new AfterCallback() {
@Override
public void onFinish(Result result) {
for(Node aNode : result.getChildren()) {
counter.incrementAndGet();
queueService.requestProcess(aNode, this);
}
totalDuration.addAndGet(result.getDuration());
if(counter.decrementAndGet() <= 0) { //last one
System.out.println("Processing of " + node.toString() + " has finished in " + totalDuration.get() + " ms");
if(waitTillFinished) {
counter.notify();
}
}
}
};
counter.incrementAndGet();
queueService.requestProcess(node, callback);
if(waitTillFinished) {
try {
counter.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
想象有一个queueService
,它使用阻塞队列和少数消费者线程来处理节点=调用DAO来获取节点的子节点(它是一个树)。所以requestProcess
方法只是对节点进行排队,不阻塞。
有没有更好/安全的方法来避免在这个示例中使用wait/notify ?根据一些发现,我可以使用Phaser(但我在java 6上工作)或条件(但我不使用锁)。
-
在你的例子中没有
synchronized
任何东西。除非在synchronized(o) {...}
块中,否则不能调用o.wait()
或o.notify()
。 -
呼叫
wait()
不在循环中。这可能不会在JVM中发生,但是语言规范允许wait()
过早返回(这被称为虚假唤醒)。更一般地说,总是使用循环是一种好做法,因为它是一种熟悉的设计模式。一个while
语句的开销不会超过一个if
,而且你应该拥有它,因为有可能出现虚假唤醒,而且你绝对必须在多消费者的情况下拥有它,所以你最好总是这样写。 -
由于必须使用
synchronized
块才能使用wait()
和notify()
,因此可能没有理由使用Atomic
。 -
这个"递归"的事情看起来非常复杂,因为回调将更多的项目添加到队列中。这能有多深?
我想你正在寻找CountDownLatch。
您实际上使用了锁,或者,让我们这样说,如果您尝试使用wait/notify(如James指出的),您应该使用它们。由于您绑定到Java 1.6,并且ForkJoin或Phaser不可用于您,因此选择要么正确地实现wait/notify,要么使用带有显式锁的Condition。这取决于你的个人喜好。
另一种选择是尝试重构你的算法,这样你就可以首先了解你需要执行的全部步骤。但这并不总是可能的。