我一直在撞我的头(我的尝试)在一个无锁的多生产者多消费者环缓冲区。该思想的基础是使用unsigned char和unsigned short类型的固有溢出,将元素缓冲区固定为这两种类型中的任何一种,然后您就可以自由循环回到环形缓冲区的开头。
问题是-我的解决方案不适用于多个生产者(尽管它适用于N个消费者,以及单个生产者单个消费者)。
#include <atomic>
template<typename Element, typename Index = unsigned char> struct RingBuffer
{
std::atomic<Index> readIndex;
std::atomic<Index> writeIndex;
std::atomic<Index> scratchIndex;
Element elements[1 << (sizeof(Index) * 8)];
RingBuffer() :
readIndex(0),
writeIndex(0),
scratchIndex(0)
{
;
}
bool push(const Element & element)
{
while(true)
{
const Index currentReadIndex = readIndex.load();
Index currentWriteIndex = writeIndex.load();
const Index nextWriteIndex = currentWriteIndex + 1;
if(nextWriteIndex == currentReadIndex)
{
return false;
}
if(scratchIndex.compare_exchange_strong(
currentWriteIndex, nextWriteIndex))
{
elements[currentWriteIndex] = element;
writeIndex = nextWriteIndex;
return true;
}
}
}
bool pop(Element & element)
{
Index currentReadIndex = readIndex.load();
while(true)
{
const Index currentWriteIndex = writeIndex.load();
const Index nextReadIndex = currentReadIndex + 1;
if(currentReadIndex == currentWriteIndex)
{
return false;
}
element = elements[currentReadIndex];
if(readIndex.compare_exchange_strong(
currentReadIndex, nextReadIndex))
{
return true;
}
}
}
};
写的主要思想是使用一个临时索引'scratchIndex'作为伪锁,在更新writeIndex之前,只允许一个生产者在任何时候复制构造到元素缓冲区中,并允许任何其他生产者进行进展。在我因为暗示我的方法是"无锁"而被称为异教徒之前,我意识到这种方法不是完全无锁,但在实践中(如果它能工作的话!)它比普通的互斥锁要快得多!
我知道这里有一个(更复杂的)MPMC环缓冲解决方案http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue,但我真的在试验我的想法,然后与这种方法进行比较,并找出每个人的优点(或者我的方法是否真的失败了!)。
我尝试过的事情;
- 使用compare_exchange_weak
- 使用更精确的std::memory_order's匹配我想要的行为
- 在各种索引之间添加cacheline垫
- 使元素std::原子而不是元素数组
我敢肯定,这归结为一个基本的段错误在我的头脑中,如何使用原子访问绕过使用互斥锁,我将完全感谢谁能指出哪些神经元是彻底错误在我的头!:)
这是a - b - a问题的一种形式。一个成功的制作人应该是这样的:
- load
currentReadIndex
- load
currentWriteIndex
cmpxchg store - 存储
element
- store
writeIndex = nextWriteIndex
scratchIndex = nextWriteIndex
如果一个生产者由于某种原因在步骤2和步骤3之间停留了足够长的时间,那么其他生产者就有可能生成整个队列的数据,并绕回完全相同的索引,以便步骤3中的比较交换成功(因为scratchIndex恰好又等于currentWriteIndex)。
这本身不是问题。停滞的生产者完全有权利增加scratchIndex
来锁定队列——即使一个神奇的检测aba的cmpxchg拒绝了存储,生产者只需要再试一次,重新加载完全相同的currentWriteIndex
,然后正常进行。
实际问题是步骤2和步骤3之间的nextWriteIndex == currentReadIndex
检查。如果currentReadIndex == currentWriteIndex
,这个队列在逻辑上是空的,所以这个检查的存在是为了确保没有生产者走得太远,以至于它覆盖了还没有消费者弹出的元素。在顶部做一次检查似乎是安全的,因为所有的消费者都应该被"困"在观察到的currentReadIndex
和观察到的currentWriteIndex
之间。
除了另一个生产者可以出现并提高writeIndex
,这将消费者从陷阱中解放出来。如果一个生产者在步骤2和步骤3之间停顿,当它唤醒readIndex
的存储值可以是任何值。
下面是一个例子,从一个空队列开始,它显示了问题的发生:
- 生产者A运行步骤1和2。两个加载的索引都是0。队列为空
- 生产者B中断并产生一个元素。
- Consumer弹出一个元素。两个指标均为1.
- 生产者B多生产255个元素。写索引转到0,读索引仍然是1。
-
生产者A从沉睡中醒来。它之前将读取和写入索引都加载为0(空队列!),因此它尝试第3步。因为另一个生产者恰好在索引0上暂停,所以比较交换成功,存储继续进行。在完成时,生产者让
writeIndex = 1
,现在两个存储的索引都是1,队列在逻辑上是空的。一个完整队列的元素值现在将被完全忽略。
(我应该提到的是,我可以不受影响地谈论"拖延"one_answers"唤醒"的唯一原因是所有使用的原子都是顺序一致的,所以我可以假装我们在单线程环境中。)
注意,你使用scratchIndex
保护并发写的方式本质上是一个锁;成功完成CMPXCHG的人获得对队列的全部写访问权,直到它释放锁。修复此故障的最简单方法是将scratchIndex
替换为自旋锁-它不会受到a - b - a的影响,并且它是实际发生的。
bool push(const Element & element)
{
while(true)
{
const Index currentReadIndex = readIndex.load();
Index currentWriteIndex = writeIndex.load();
const Index nextWriteIndex = currentWriteIndex + 1;
if(nextWriteIndex == currentReadIndex)
{
return false;
}
if(scratchIndex.compare_exchange_strong(
currentWriteIndex, nextWriteIndex))
{
elements[currentWriteIndex] = element;
// Problem here!
writeIndex = nextWriteIndex;
return true;
}
}
}
我已经把有问题的地方标出来了。多个线程可以同时访问writeIndex = nextWriteIndex。数据将以任意顺序写入,尽管每次写入都是原子的。
这是一个问题,因为您试图使用相同的原子条件更新两个值,这通常是不可能的。假设方法的其余部分没问题,解决这个问题的一种方法是将scratchIndex和writeIndex组合成一个大小加倍的值。例如,将两个uint32_t值视为一个uint64_t值,并对其进行自动操作。