PriorityBlockingQueue,确保连续的项目顺序



我正在接收一个消息序列,我想按照它们的顺序处理它们。每条消息都有一个序列号。有一个线程池接收它们。我想把它们放到一个阻塞队列中,就像PriorityBlockingQueue一样,并按照正确的顺序读取它们,阻塞直到下一个连续的消息可用。

。给定以下代码:

ConsecutiveBlockingQueue<Integer> q = new ConsecutiveBlockingQueue<>();
new Thread (()->{ q.put(0); q.put(2); }).start();
new Thread (()->{ q.put(1); q.put(3); }).start();
ArrayList<Integer> ordered = new ArrayList<>(4);
for (int i = 0; i < 4; i++) {
ordered.add(q.take());
}
System.out.println(ordered);

我想让它打印[0,1,2,3]

这是一个经过最低限度测试的类,似乎可以做我想做的事情。欢迎评论。

package com.ciphercloud.sdp.common;
import java.util.AbstractQueue;
import java.util.Collection;
import java.util.Iterator;
import java.util.PriorityQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.ToIntFunction;
public class ConsecutiveBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E> {
private final ToIntFunction <E> ixFunction;
// blocking queue for consecutive items. Take operations will look into this queue
LinkedBlockingQueue <E> bQueue = new LinkedBlockingQueue<>();
// buffering/ordering queue for items that are out of sequence
PriorityQueue <E> pQueue = new PriorityQueue<>();
ReentrantLock lock = new ReentrantLock();
private int nextIx;
ConsecutiveBlockingQueue(ToIntFunction <E> ixFunction) {
this(0, ixFunction);
}
ConsecutiveBlockingQueue(int startIx, ToIntFunction <E> ixFunction) {
nextIx = startIx;
this.ixFunction = ixFunction;
}
@Override
public Iterator <E> iterator() {
return bQueue.iterator();
}
@Override
public int size() {
return bQueue.size();
}
protected BlockingQueue <E> delegate() {
return bQueue;
}
@Override
public int remainingCapacity() {
return Integer.MAX_VALUE;
}
@Override
public int drainTo(Collection <? super E> c) {
return bQueue.drainTo(c);
}
@Override
public int drainTo(Collection <? super E> c, int maxElements) {
return bQueue.drainTo(c, maxElements);
}
@Override
public void put(E e) {
offer(e);
}
@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
offer(e);
return true;
}
@Override
public boolean offer(E e) {
lock.lock();
try {
if (ixFunction.applyAsInt(e) == nextIx) {
// offered item is the next consecutive expected one
// put it directly into the blocking queue
bQueue.offer(e);
nextIx++;
// if there are any buffered items in the pQueue, move them
// into the blocking queue while they follow consecutively
while(true) {
E next = pQueue.peek();
if(next == null || ixFunction.applyAsInt(next) != nextIx) {
// no more items in pQueue, or next item is not consecutive
break;
}
pQueue.poll();
bQueue.offer(next);
nextIx++;
}
} else {
// offered item is not consecutively next. Buffer it in pQueue
pQueue.offer(e);
}
} finally {
lock.unlock();
}
return true;
}

@Override
public E take() throws InterruptedException {
return bQueue.take();
}
@Override
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return bQueue.poll(timeout, unit);
}

@Override
public E poll() {
return bQueue.poll();
}
@Override
public E peek() {
return bQueue.peek();
}
}

最新更新