UPDATED:我已经根据下面的正确答案更新了代码。这可以工作,但会产生一个新问题(我会发布一个新问题)。
使用具有多个生产者和消费者的信号量创建阻塞有界缓冲区类。
的目标是使用原子整数作为指针,所以我不必内部同步。溢出处理已经修正为现在使用CAS。
但是,除非我在AtomicInteger指针周围使用synchronized,否则这不起作用(参见下面的注释)。不知道为什么?一直向下滚动,看看我所说的"缺少条目"是什么意思…
public class BoundedBuffer<T> {
private static final int BUFFER_SIZE = Short.MAX_VALUE+1;
private AtomicReferenceArray<T> m_buffer = null;
private Semaphore m_full = new Semaphore(BUFFER_SIZE);
private Semaphore m_empty = new Semaphore(0);
private AtomicInteger m_writePointer = new AtomicInteger();
private AtomicInteger m_readPointer = new AtomicInteger();
public BoundedBuffer() {
m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE);
}
public static int safeGetAndIncrement(AtomicInteger i) {
int oldValue = 0, newValue = 0;
do {
oldValue = i.get();
newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1);
} while (!i.compareAndSet(oldValue, newValue));
return oldValue;
}
public void add(T data) throws InterruptedException {
m_full.acquire();
synchronized (this) { // << Commenting this doesn't work
// CAS-based overflow handling
m_buffer.set(safeGetAndIncrement(m_writePointer),data);
}
m_empty.release();
}
public T get() throws InterruptedException {
T data = null;
m_empty.acquire();
synchronized (this) { // << Commenting this doesn't work
// CAS-based overflow handling
data = m_buffer.get(safeGetAndIncrement(m_readPointer));
}
m_full.release();
return data;
}
}
测试程序有…
8个生产者线程,每个线程将大约10000个条目放入队列。每个条目是格式为:a ":"B的字符串在哪里A是数字0..7代表8个线程。B是一个从0到9999的递增数
4个消费线程,消费所有东西,直到null。
一旦生产者线程完成向缓冲区中添加所有内容,4个null将被添加到队列中(以停止消费者)。
p:数据,1:9654@1P:数据,5:1097@347C:数据,1:9654@1P:数据,4:5538@1C:数据,4:5538@1C:数据,null@14466
验证在验证是否所有生成的条目都是消费者时,出现了一些丢失(就在arrayindexoutofbounds被击中之前(可能只是巧合)。
验证……失踪4:5537失踪5:1096验证
在增加计数器时需要处理溢出。例如,您可以使用以下方法代替getAndIncrement()
:
public static int safeGetAndIncrement(AtomicInteger i) {
int oldValue = 0;
do {
oldValue = i.get();
int newValue = (oldValue == MAX_VALUE) ? 0 : (oldValue + 1);
} while (!i.compareAndSet(oldValue, newValue));
return oldValue;
}
它使用典型的比较和交换方法,不应该损害性能,因为getAndIncrement()
在内部以相同的方式实现。
同样,如果MAX_VALUE
是BUFFER_SIZE
,则不需要% BUFFER_SIZE
操作。