ARM 上的无锁 SPSC 队列实现



我正在尝试为 ARM 编写一个单一生产者单一消费者队列,我想我即将对 DMB 进行思考,但需要一些检查(我更熟悉 std::atomic。

这是我所在的地方:

bool push(const_reference value)
{
// Check for room
const size_type currentTail = tail;
const size_type nextTail = increment(currentTail);
if (nextTail == head)
return false;
// Write the value
valueArr[currentTail] = value;
// Prevent the consumer from seeing the incremented tail before the
// value is written.
__DMB();
// Increment tail
tail = nextTail;
return true;
}
bool pop(reference valueLocation)
{
// Check for data
const size_type currentHead = head;
if (currentHead == tail)
return false;
// Write the value.
valueLocation = valueArr[currentHead];
// Prevent the producer from seeing the incremented head before the
// value is written.
__DMB();
// Increment the head
head = increment(head);
return true;
}

我的问题是:我的DMB位置和理由准确吗?还是还有人明白我失踪了?我特别不确定条件在处理由其他线程(或中断(更新的变量时是否需要一些保护。

  • 屏障是必要的,但还不够,您还需要"获取"语义来加载由其他线程修改的 var。 (或者至少consume,但是要做到这一点没有障碍需要asm来创建数据依赖关系。 编译器在已经具有控件依赖项后不会这样做。
  • 单核系统只能使用像GNU Casm("":::"memory")std::atomic_signal_fence(std::memory_order_release)这样的编译器屏障,而不是dmb。 创建一个宏,以便您可以在 SMP 安全屏障或 UP(单处理器(屏障之间进行选择。
  • head = increment(head);是毫无意义的重新加载head,请使用本地副本。
  • 使用std::atomic可移植获取必要的代码生成。

你通常不需要滚动自己的原子;ARM的现代编译器确实实现了std::atomic<T>。 但是AFAIK,没有std::atomic<>实现知道单核系统,以避免实际障碍并且只是安全。可能导致上下文切换的中断。

在单核系统上,你不需要dsb,只需要一个编译器屏障。 CPU将保留asm指令按程序顺序顺序执行的错觉。 您只需要确保编译器生成的asm以正确的顺序执行操作即可。 您可以通过使用带有std::memory_order_relaxedstd::atomic以及手动atomic_signal_fence(memory_order_acquire)release障碍来做到这一点。 (不是atomic_thread_fence;这将发出asm指令,通常dsb(。


每个线程读取另一个线程修改的变量。 您正确地将修改版本存储,方法是确保它们仅在访问阵列后可见。

但这些读取也需要是采集加载,以便与这些发布存储同步。 例如,确保pushpop完成读取同一元素之前没有写入valueArr[currentTail] = value;。 或者在条目完全写入之前阅读条目。

没有任何障碍,故障模式将是if (currentHead == tail) return false;实际上不会从内存中检查tail的值,直到之后valueLocation = valueArr[currentHead];发生了。 运行时负载重新排序可以在弱序 ARM 上轻松完成此操作。 如果加载地址对tail有数据依赖性,则可以避免在SMP系统上需要障碍(ARM保证asm中的依赖性排序;mo_consume应该公开的功能(。 但是,如果编译器只发出一个分支,那只是一个控件依赖项,而不是数据。 如果您在 asm 中手动编写,我认为像ldrne r0, [r1, r2]在比较设置的标志上的谓词负载会产生数据依赖关系。

编译时重新排序不太合理,但如果编译器只是阻止编译器执行它无论如何都不会做的事情,则仅编译器障碍是免费的。

未经测试的

实现,编译为看起来正常但没有其他测试的ASM

push做类似的事情。 我包含了用于加载获取/存储释放和 fullbarrier(( 的包装器函数。 (相当于 Linux 内核的smp_mb()宏,定义为编译时或编译+运行时屏障。

#include <atomic>
#define UNIPROCESSOR

#ifdef UNIPROCESSOR
#define fullbarrier()  asm("":::"memory")   // GNU C compiler barrier
// atomic_signal_fence(std::memory_order_seq_cst)
#else
#define fullbarrier() __DMB()    // or atomic_thread_fence(std::memory_order_seq_cst)
#endif
template <class T>
T load_acquire(std::atomic<T> &x) {
#ifdef UNIPROCESSOR
T tmp = x.load(std::memory_order_relaxed);
std::atomic_signal_fence(std::memory_order_acquire);
// or fullbarrier();  if you want to use that macro
return tmp;
#else
return x.load(std::memory_order_acquire);
// fullbarrier() / __DMB();
#endif
}
template <class T>
void store_release(std::atomic<T> &x, T val) {
#ifdef UNIPROCESSOR
std::atomic_signal_fence(std::memory_order_release);
// or fullbarrier();
x.store(val, std::memory_order_relaxed);
#else
// fullbarrier() / __DMB(); before plain store
return x.store(val, std::memory_order_release);
#endif
}
template <class T>
struct SPSC_queue {
using size_type = unsigned;
using value_type = T;
static const size_type size = 1024;
std::atomic<size_type> head;
value_type valueArr[size];
std::atomic<size_type> tail;  // in a separate cache-line from head to reduce contention
bool push(const value_type &value)
{
// Check for room
const size_type currentTail = tail.load(std::memory_order_relaxed);  // no other writers to tail, no ordering needed
const size_type nextTail = currentTail + 1;    // modulo separately so empty and full are distinguishable.
if (nextTail == load_acquire(head))
return false;
valueArr[currentTail % size] = value;
store_release(tail, nextTail);
return true;
}
};
// instantiate the template for  int  so we can look at the asm
template bool SPSC_queue<int>::push(const value_type &value);

如果您使用-DUNIPROCESSOR,则在 Godbolt 编译器资源管理器上干净地编译,具有零障碍,带有g++9.2 -O3 -mcpu=cortex-a15(只是为了选择一个随机的现代 ARM 内核,以便 GCC 可以内联加载/存储功能和非单处理器情况的屏障std::atomic

最新更新