在java中编写线程安全的阻塞有界缓冲区结构时,如何处理索引溢出



UPDATED:我已经根据下面的正确答案更新了代码。这可以工作,但会产生一个新问题(我会发布一个新问题)。

使用具有多个生产者和消费者的信号量创建阻塞有界缓冲区类。

的目标是使用原子整数作为指针,所以我不必内部同步。溢出处理已经修正为现在使用CAS。

但是,除非我在AtomicInteger指针周围使用synchronized,否则这不起作用(参见下面的注释)。不知道为什么?一直向下滚动,看看我所说的"缺少条目"是什么意思…

public class BoundedBuffer<T> {
private static final int BUFFER_SIZE = Short.MAX_VALUE+1;
private AtomicReferenceArray<T> m_buffer = null;
private Semaphore m_full = new Semaphore(BUFFER_SIZE);
private Semaphore m_empty = new Semaphore(0);
private AtomicInteger m_writePointer = new AtomicInteger();
private AtomicInteger m_readPointer = new AtomicInteger();
public BoundedBuffer() {
    m_buffer = new AtomicReferenceArray<T>(BUFFER_SIZE);
}
public static int safeGetAndIncrement(AtomicInteger i) {
    int oldValue = 0, newValue = 0;
    do {
        oldValue = i.get();
        newValue = (oldValue == Short.MAX_VALUE) ? 0 : (oldValue + 1);
    } while (!i.compareAndSet(oldValue, newValue));
    return oldValue;
}
public void add(T data) throws InterruptedException {
    m_full.acquire();
    synchronized (this) { // << Commenting this doesn't work
        // CAS-based overflow handling
        m_buffer.set(safeGetAndIncrement(m_writePointer),data);
    }
    m_empty.release();
}
public T get() throws InterruptedException {
    T data = null;
    m_empty.acquire();
    synchronized (this) { // << Commenting this doesn't work
        // CAS-based overflow handling
        data = m_buffer.get(safeGetAndIncrement(m_readPointer));
    }
    m_full.release();
    return data;
}
}

测试程序有…

8个生产者线程,每个线程将大约10000个条目放入队列。每个条目是格式为:a ":"B的字符串在哪里A是数字0..7代表8个线程。B是一个从0到9999的递增数

4个消费线程,消费所有东西,直到null。

一旦生产者线程完成向缓冲区中添加所有内容,4个null将被添加到队列中(以停止消费者)。

p:数据,1:9654@1P:数据,5:1097@347C:数据,1:9654@1P:数据,4:5538@1C:数据,4:5538@1C:数据,null@14466

验证

在验证是否所有生成的条目都是消费者时,出现了一些丢失(就在arrayindexoutofbounds被击中之前(可能只是巧合)。

验证……失踪4:5537失踪5:1096验证

在增加计数器时需要处理溢出。例如,您可以使用以下方法代替getAndIncrement():

public static int safeGetAndIncrement(AtomicInteger i) {
    int oldValue = 0;
    do {
        oldValue = i.get();
        int newValue = (oldValue == MAX_VALUE) ? 0 : (oldValue + 1);
    } while (!i.compareAndSet(oldValue, newValue));
    return oldValue;
}

它使用典型的比较和交换方法,不应该损害性能,因为getAndIncrement()在内部以相同的方式实现。

同样,如果MAX_VALUEBUFFER_SIZE,则不需要% BUFFER_SIZE操作。