我开发了一个队列,允许单个消费者和生产者同时提供/轮询队列中的元素,而无需对每个提供/轮询进行同步或CAS操作。相反,当队列的尾部为空时,只需要有一个原子操作。这个队列的目的是在队列被缓冲并且消费者没有赶上生产者的情况下减少延迟。
在这个问题上,我想回顾一下实现(代码还没有被其他人审查过,所以很高兴能得到第二个意见),并讨论一种我认为应该显著减少延迟的使用模式,以及这种架构是否可能比LMAX颠覆器运行得更快。
代码位于github上:https://github.com/aranhakki/experimental-performance/blob/master/java/src/concurrency/messaging/ConcurrentPollOfferArrayQueue.java
/*
* Copyright 2014 Aran Hakki
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package concurrency.messaging;
// A non-blocking queue which allows concurrent offer and poll operations with minimal contention.
// Contention in offer and poll operations only occurs when offerArray, which acts as an incomming message buffer,
// becomes full, and we must wait for it too be swapped with the pollArray, acting as a outgoing message buffer,
// the most simple analogy would be too imaging two buckets, one we fill and at the same time we empty another bucket
// which already contains some liquid, then at the point the initial bucket becomes full, we swap it with the bucket that
// is being emptied.
// It's possible that this mechanism might be faster than the LMAX disruptor, need to create tests to confirm.
public final class ConcurrentPollOfferArrayQueue<T> {
private T[] offerArray;
private T[] pollArray;
public ConcurrentPollOfferArrayQueue(T[] _pollArray){
offerArray = (T[]) new Object[_pollArray.length];
pollArray = _pollArray;
}
private int offerIndex = 0;
private int pollIndex = 0;
public void offer(T t){
if (offerIndex<offerArray.length){
offerArray[offerIndex] = t;
offerIndex++;
} else {
while(!arrayFlipped){
}
arrayFlipped = false;
offerIndex = 0;
offer(t);
}
}
private volatile boolean arrayFlipped = false;
public T poll(){
if (pollIndex<pollArray.length){
T t = pollArray[pollIndex];
pollArray[pollIndex] = null;
pollIndex++;
return t;
} else {
pollIndex = 0;
T[] pollArrayTmp = pollArray;
pollArray = offerArray;
offerArray = pollArrayTmp;
arrayFlipped = true;
return poll();
}
}
}
通过使用许多这样的队列来代替多个生产者和消费者,他们都引用同一个队列,我认为可以显著减少延迟。
考虑生产者A、B、C都引用单个队列Q,消费者E、E和F都引用同一队列。这导致了以下一组关系,因此引发了很多争论:
A写入Q
B写入Q
C写入Q
E写入Q
D写入Q
F写入Q
使用我开发的队列,可以在每个生产者和单个消费者聚合线程之间有一个队列,该线程将获取每个生产者队列尾部的元素,并将它们放在消费者队列的头部。这将大大减少争用,因为我们对一段内存只有一个写入程序。关系船现在看起来如下:
一篇关于(AQ)的文章
B写信给(BQ)负责人
C写入(CQ)的头部
ConsumerAggregation线程写入尾部(AQ)
消费者聚合线程写入尾部(BQ)
消费者聚合线程写入尾部(CQ)
消费者聚合线程写入(EQ)的头部
ConsumerAggregation线程写入(FQ)的头部
消费者聚合线程写入(GQ)的头部
E写入(EQ)尾部
F写入尾部(FQ)
G写入尾部(GQ)
上述关系确保了单一作者原则。
我很想听听你的想法。
你们觉得这个实现怎么样?我已经更改了它,以便在pollQueue为空时,轮询线程触发队列切换。
/*
* Copyright 2014 Aran Hakki
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
* A non-blocking queue which allows concurrent offer and poll operations with minimal contention.
* Contention in offer and poll operations only occurs when pollQueue is empty and must be swapped with offer queue.
* This implementation does not make use of any low level Java memory optimizations e.g. using the Unsafe class or direct byte buffers,
* so its possible it could run much faster.
* If re-engineered to use lower level features its possible that this approach might be faster than the LMAX disruptor.
* I'm current observing an average latency of approx 6000ns.
*/
package concurrency.messaging;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
public class ConcurrentPollOfferQueue<T> {
private class ThreadSafeSizeQueue<T> {
private Queue<T> queue = new LinkedList<T>();
private volatile AtomicInteger size = new AtomicInteger(0);
public int size(){
return size.get();
}
public void offer(T value){
queue.offer(value);
size.incrementAndGet();
}
public T poll(){
T value = queue.poll();
if (value!=null){
size.decrementAndGet();
}
return value;
}
}
private volatile ThreadSafeSizeQueue<T> offerQueue;
private volatile ThreadSafeSizeQueue<T> pollQueue;
private int capacity;
public ConcurrentPollOfferQueue(int capacity){
this.capacity = capacity;
offerQueue = new ThreadSafeSizeQueue<T>();
pollQueue = new ThreadSafeSizeQueue<T>();
}
public void offer(T value){
while(offerQueue.size()==capacity){/* wait for consumer to finishing consuming pollQueue */}
offerQueue.offer(value);
}
public T poll(){
T polled;
while((polled = pollQueue.poll())==null){
if (pollQueue.size()==0){
ThreadSafeSizeQueue<T> tmpQueue = offerQueue;
offerQueue = pollQueue;
pollQueue = tmpQueue;
}
}
return polled;
}
public boolean isEmpty(){
return pollQueue.size()==0;
}