如何快照队列以避免无限循环



我有一个ConcurrentLinkedQueue,它允许从多个线程插入,但当我轮询队列时,我在一个函数中执行,并轮询直到队列为空。这可能会导致无限循环,因为在轮询时可能会有线程插入队列。如何创建队列的视图并在轮询前清空它,并且仍然是线程安全的?

我看到的一种方法是使用ConcurrentLinkedDeque并迭代,直到到达最近添加的项。您不能对单端队列执行此操作,因为读取首先查看头部,您需要读取尾部才能找到最后添加的元素。

ConcurrentLinkedDeque的工作方式是,对offer(Object)add(Object)的调用将把项目放在队列的尾部。对poll()的调用将读取队列的头,如下所示:

// Read direction --->
HEAD -> E1 -> E2 -> E3 = TAIL
// Write direction --->

当您添加更多的项目时,尾部将延伸到最后一个元素,但由于我们想像上次看到的那样清空队列,我们将获取尾部指针并迭代,直到到达尾部。然后,我们可以让后续迭代处理添加的内容,同时清空队列。我们首先peek,因为使用poll将移除最后添加的值,因此我们将无法确定何时停止移除元素,因为我们的标记被移除。

ConcurrentLinkedDeque<Object> deque = new ConcurrentLinkedDeque<>();
public void emptyCurrentView() {
Object tail = deque.peekLast();
if (tail != null) {
while (true) {
// Poll the current head
Object current = deque.poll();
// Process the element
process(current);
// If we finish processing the marker
// Exit the method
if (current == tail) {
return;
}
}
}
}

您不需要修改生产者代码,因为生产者的默认offer(Object)add(Object)与将元素添加到尾部完全相同。

如何创建队列的视图并在轮询前清空它,并且仍然是线程安全的?

是的,这听起来是一个非常糟糕的模式。使用并发队列实现的全部意义在于,您可以同时向队列添加和从队列中删除。如果你想坚持使用ConcurrentLinkedQueue,那么我会做这样的事情:

// run every so often
while (true) {
// returns null immediately if the queue is empty
Item item = blockingQueue.poll();
if (item == null) {
break;
}
// process the item...
}

但是,我会考虑改用LinkedBlockingQueue,因为它支持take()。消费者线程将处于这样的循环中:

private final BlockingQueue<Item> blockingQueue = new LinkedBlockingQueue<>();
...
while (!Thread.currentThread().isInterrupted()) {
// wait for the queue to get an item
Item item = blockingQueue.take();
// process item...
}

CCD_ 13扩展了CCD_ 14,因此CCD_。

最新更新