无锁有界MPMC环缓冲区故障



我一直在撞我的头(我的尝试)在一个无锁的多生产者多消费者环缓冲区。该思想的基础是使用unsigned char和unsigned short类型的固有溢出,将元素缓冲区固定为这两种类型中的任何一种,然后您就可以自由循环回到环形缓冲区的开头。

问题是-我的解决方案不适用于多个生产者(尽管它适用于N个消费者,以及单个生产者单个消费者)。

#include <atomic>
template<typename Element, typename Index = unsigned char> struct RingBuffer
{
  std::atomic<Index> readIndex;
  std::atomic<Index> writeIndex;
  std::atomic<Index> scratchIndex;
  Element elements[1 << (sizeof(Index) * 8)];
  RingBuffer() :
    readIndex(0),
    writeIndex(0),
    scratchIndex(0)
  {
    ;
  }
  bool push(const Element & element)
  {
    while(true)
    {
      const Index currentReadIndex = readIndex.load();
      Index currentWriteIndex = writeIndex.load();
      const Index nextWriteIndex = currentWriteIndex + 1;
      if(nextWriteIndex == currentReadIndex)
      {
        return false;
      }
      if(scratchIndex.compare_exchange_strong(
        currentWriteIndex, nextWriteIndex))
      {
        elements[currentWriteIndex] = element;
        writeIndex = nextWriteIndex;
        return true;
      }
    }
  }
  bool pop(Element & element)
  {
    Index currentReadIndex = readIndex.load();
    while(true)
    {
      const Index currentWriteIndex = writeIndex.load();
      const Index nextReadIndex = currentReadIndex + 1;
      if(currentReadIndex == currentWriteIndex)
      {
        return false;
      }
      element = elements[currentReadIndex];
      if(readIndex.compare_exchange_strong(
        currentReadIndex, nextReadIndex))
      {
        return true;
      }
    }
  }
};

写的主要思想是使用一个临时索引'scratchIndex'作为伪锁,在更新writeIndex之前,只允许一个生产者在任何时候复制构造到元素缓冲区中,并允许任何其他生产者进行进展。在我因为暗示我的方法是"无锁"而被称为异教徒之前,我意识到这种方法不是完全无锁,但在实践中(如果它能工作的话!)它比普通的互斥锁要快得多!

我知道这里有一个(更复杂的)MPMC环缓冲解决方案http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue,但我真的在试验我的想法,然后与这种方法进行比较,并找出每个人的优点(或者我的方法是否真的失败了!)。

我尝试过的事情;

    使用compare_exchange_weak
  • 使用更精确的std::memory_order's匹配我想要的行为
  • 在各种索引之间添加cacheline垫
  • 使元素std::原子而不是元素数组

我敢肯定,这归结为一个基本的段错误在我的头脑中,如何使用原子访问绕过使用互斥锁,我将完全感谢谁能指出哪些神经元是彻底错误在我的头!:)

这是a - b - a问题的一种形式。一个成功的制作人应该是这样的:

  1. load currentReadIndex
  2. load currentWriteIndex
  3. cmpxchg store scratchIndex = nextWriteIndex
  4. 存储element
  5. store writeIndex = nextWriteIndex

如果一个生产者由于某种原因在步骤2和步骤3之间停留了足够长的时间,那么其他生产者就有可能生成整个队列的数据,并绕回完全相同的索引,以便步骤3中的比较交换成功(因为scratchIndex恰好又等于currentWriteIndex)。

这本身不是问题。停滞的生产者完全有权利增加scratchIndex来锁定队列——即使一个神奇的检测aba的cmpxchg拒绝了存储,生产者只需要再试一次,重新加载完全相同的currentWriteIndex,然后正常进行。

实际问题是步骤2和步骤3之间的nextWriteIndex == currentReadIndex检查。如果currentReadIndex == currentWriteIndex,这个队列在逻辑上是空的,所以这个检查的存在是为了确保没有生产者走得太远,以至于它覆盖了还没有消费者弹出的元素。在顶部做一次检查似乎是安全的,因为所有的消费者都应该被"困"在观察到的currentReadIndex和观察到的currentWriteIndex之间。

除了另一个生产者可以出现并提高writeIndex,这将消费者从陷阱中解放出来。如果一个生产者在步骤2和步骤3之间停顿,当它唤醒readIndex的存储值可以是任何值。

下面是一个例子,从一个空队列开始,它显示了问题的发生:

  1. 生产者A运行步骤1和2。两个加载的索引都是0。队列为空
  2. 生产者B中断并产生一个元素。
  3. Consumer弹出一个元素。两个指标均为1.
  4. 生产者B多生产255个元素。写索引转到0,读索引仍然是1。
  5. 生产者A从沉睡中醒来。它之前将读取和写入索引都加载为0(空队列!),因此它尝试第3步。因为另一个生产者恰好在索引0上暂停,所以比较交换成功,存储继续进行。在完成时,生产者让writeIndex = 1,现在两个存储的索引都是1,队列在逻辑上是空的。一个完整队列的元素值现在将被完全忽略。

(我应该提到的是,我可以不受影响地谈论"拖延"one_answers"唤醒"的唯一原因是所有使用的原子都是顺序一致的,所以我可以假装我们在单线程环境中。)


注意,你使用scratchIndex保护并发写的方式本质上是一个锁;成功完成CMPXCHG的人获得对队列的全部写访问权,直到它释放锁。修复此故障的最简单方法是将scratchIndex替换为自旋锁-它不会受到a - b - a的影响,并且它是实际发生的。

 bool push(const Element & element)
  {
    while(true)
    {
      const Index currentReadIndex = readIndex.load();
      Index currentWriteIndex = writeIndex.load();
      const Index nextWriteIndex = currentWriteIndex + 1;
      if(nextWriteIndex == currentReadIndex)
      {
        return false;
      }
      if(scratchIndex.compare_exchange_strong(
        currentWriteIndex, nextWriteIndex))
      {
        elements[currentWriteIndex] = element;
        // Problem here!
        writeIndex = nextWriteIndex;
        return true;
      }
    }
  }

我已经把有问题的地方标出来了。多个线程可以同时访问writeIndex = nextWriteIndex。数据将以任意顺序写入,尽管每次写入都是原子的。

这是一个问题,因为您试图使用相同的原子条件更新两个值,这通常是不可能的。假设方法的其余部分没问题,解决这个问题的一种方法是将scratchIndex和writeIndex组合成一个大小加倍的值。例如,将两个uint32_t值视为一个uint64_t值,并对其进行自动操作。

相关内容

  • 没有找到相关文章

最新更新