我想从原子uint32s中拼凑出一个uint64原子计数器。计数器有一个写入程序和多个读卡器。写入程序是一个信号处理程序,因此它不能阻塞。
我的想法是使用低位的生成计数作为读锁。读取器重试,直到整个读取过程中生成计数稳定,并且低位未设置。
以下代码在设计和使用内存排序时是否正确?有更好的方法吗?
using namespace std;
class counter {
atomic<uint32_t> lo_{};
atomic<uint32_t> hi_{};
atomic<uint32_t> gen_{};
uint64_t read() const {
auto acquire = memory_order_acquire;
uint32_t lo, hi, gen1, gen2;
do {
gen1 = gen_.load(acquire);
lo = lo_.load(acquire);
hi = hi_.load(acquire);
gen2 = gen_.load(acquire);
} while (gen1 != gen2 || (gen1 & 1));
return (uint64_t(hi) << 32) | lo;
}
void increment() {
auto release = memory_order_release;
gen_.fetch_add(1, release);
uint32_t newlo = 1 + lo_.fetch_add(1, release);
if (newlo == 0) {
hi_.fetch_add(1, release);
}
gen_.fetch_add(1, release);
}
};
编辑:哇,固定auto acquire = memory_order_release;
这是一种已知的模式,称为SeqLock。https://en.wikipedia.org/wiki/Seqlock.(简化为只有一个编写器,因此不需要额外的支持来排除同时编写器。)它不是无锁的;一个作家在错误的时间睡觉会让读者旋转,直到作家写完为止。但在没有发生这种情况的常见情况下,它具有出色的性能,在真正只读的读取器之间没有争用。
您不需要也不希望有效负载的增量使用原子RMW操作。(除非你所在的系统可以廉价地进行64位原子添加或加载,否则不要使用SeqLock。)
您可以使用32位原子加载来同时加载两半,然后递增,然后原子存储结果。(使用廉价的relaxed
或release
内存顺序作为有效负载,并使用release
存储进行第二个序列计数器更新,即所谓的"生成"计数器)。
类似地,序列计数器也不需要是原子RMW。(除非您将其用作具有多个写入程序的旋转锁)
单个写入程序只需要纯加载和仅release
排序的纯存储,这比原子RMW(便宜得多),或者使用seq_cst
排序的存储:
- 以任何顺序加载计数器和值
- 存储一个新计数器(旧+1)
- 存储新的值(或者如果你想不进位分支,只更新下半部分)
- 存储最终计数器
这3个要点中的商店排序是唯一重要的。第一个存储之后的写围栏可能很好,因为我们真的不希望在比relaxed
更贵的CPU上同时制作值为release
的两个存储的成本。
不幸的是,为了满足C++规则,value
必须是atomic<T>
,这使得编译器无法生成尽可能高效的代码来加载两半。例如ARMldrd
或ldp
/stp
负载对直到ARMv8.4a才保证是原子的,但这并不重要。(编译器通常不会将两个独立的32位原子加载优化为一个更宽的加载。)
其他线程在序列计数器为奇数时读取的值是不相关的,但我们希望避免未定义的行为。也许我们可以使用volatile uint64_t
和atomic<uint64_t>
的并集
我为另一个问题写了这个C++SeqLock<class T>
模板,我没有写完答案(弄清楚哪些版本的ARM有64位原子加载和存储)。
这将尝试检查目标是否已经支持atomic<T>
上的无锁原子操作,以在毫无意义时阻止您使用它。(为定义IGNORE_SIZECHECK
的测试目的禁用该选项。)TODO:透明地返回到执行此操作,可能使用模板专用化,而不是使用static_assert
。
我为T
提供了一个支持++
运算符的inc()
函数。TODO将是一个apply()
,它接受lambda对T
执行某些操作,并在序列计数器更新之间存储结果。
// **UNTESTED**
#include <atomic>
#ifdef UNIPROCESSOR
// all readers and writers run on the same core (or same software thread)
// ordering instructions at compile time is all that's necessary
#define ATOMIC_FENCE std::atomic_signal_fence
#else
// A reader can be running on another core while writing.
// Memory barriers or ARMv8 acquire / release loads / store are needed
#define ATOMIC_FENCE std::atomic_thread_fence
#endif
// using fences instead of .store(std::memory_order_release) will stop the compiler
// from taking advantage of a release-store instruction instead of separate fence, like on AArch64
// But fences allow it to be optimized away to just compile-time ordering for the single thread or unirprocessor case.
// SINGLE WRITER only.
// uses volatile + barriers for the data itself, like pre-C++11
template <class T>
class SeqLocked
{
#ifndef IGNORE_SIZECHECK
// sizeof(T) > sizeof(unsigned)
static_assert(!std::atomic<T>::is_always_lock_free, "A Seq Lock with a type small enough to be atomic on its own is totally pointless, and we don't have a specialization that replaces it with a straight wrapper for atomic<T>");
#endif
// C++17 doesn't have a good way to express a load that doesn't care about tearing
// without explicitly writing it as multiple small parts and thus gimping the compiler if it can use larger loads
volatile T data; // volatile should be fine on any implementation where pre-C++11 lockless code was possible with volatile,
// even though Data Race UB does apply to volatile variables in ISO C++11 and later.
// even non-volatile normally works in practice, being ordered by compiler barriers.
std::atomic<unsigned> seqcount{0}; // Even means valid, odd means modification in progress.
// unsigned definitely wraps around at a power of 2 on overflow
public:
T get() const {
unsigned c0, c1;
T tmp;
// READER RETRY LOOP
do {
c0 = seqcount.load(std::memory_order_acquire); // or for your signal-handler use-case, relaxed load followed by ATOMIC_FENCE(std::memory_order_acquire);
tmp = (T)data; // load
ATOMIC_FENCE(std::memory_order_acquire); // LoadLoad barrier
c1 = seqcount.load(std::memory_order_relaxed);
} while(c0&1 || c0 != c1); // retry if the counter changed or is odd
return tmp;
}
// TODO: a version of this that takes a lambda for the operation on tmp
T inc() // WRITER
{
unsigned orig_count = seqcount.load(std::memory_order_relaxed);
// we're the only writer, avoid an atomic RMW.
seqcount.store(orig_count+1, std::memory_order_relaxed);
ATOMIC_FENCE(std::memory_order_release); // 2-way barrier *after* the store, not like a release store. Or like making data=tmp a release operation.
// make sure the counter becomes odd *before* any data change
T tmp = data; // load into a non-volatile temporary
++tmp; // make any change to it
data = tmp; // store
seqcount.store(orig_count+2, std::memory_order_release); // or use ATOMIC_FENCE(std::memory_order_release); *before* this, so the UNIPROCESSOR case can just do compile-time ordering
return tmp;
}
void set(T newval) {
unsigned orig_count = seqcount.load(std::memory_order_relaxed);
seqcount.store(orig_count+1, std::memory_order_relaxed);
ATOMIC_FENCE(std::memory_order_release);
// make sure the data stores appear after the first counter update.
data = newval; // store
ATOMIC_FENCE(std::memory_order_release);
seqcount.store(orig_count+2, std::memory_order_relaxed); // Or use mo_release here, better on AArch64
}
};
/***** test callers *******/
#include <stdint.h>
struct sixteenbyte {
//unsigned arr[4];
unsigned long a,b,c,d;
sixteenbyte() = default;
sixteenbyte(const volatile sixteenbyte &old)
: a(old.a), b(old.b), c(old.c), d(old.d) {}
//arr(old.arr) {}
};
void test_inc(SeqLocked<uint64_t> &obj) { obj.inc(); }
sixteenbyte test_get(SeqLocked<sixteenbyte> &obj) { return obj.get(); }
//void test_set(SeqLocked<sixteenbyte> &obj, sixteenbyte val) { obj.set(val); }
uint64_t test_get(SeqLocked<uint64_t> &obj) {
return obj.get();
}
// void atomic_inc_u64_seq_cst(std::atomic<uint64_t> &a) { ++a; }
uint64_t u64_inc_relaxed(std::atomic<uint64_t> &a) {
// same but without dmb barriers
return 1 + a.fetch_add(1, std::memory_order_relaxed);
}
uint64_t u64_load_relaxed(std::atomic<uint64_t> &a) {
// gcc uses LDREXD, not just LDRD?
return a.load(std::memory_order_relaxed);
}
void u64_store_relaxed(std::atomic<uint64_t> &a, uint64_t val) {
// gcc uses a LL/SC retry loop even for a pure store?
a.store(val, std::memory_order_relaxed);
}
它在用于ARM和其他ISAs的Godbolt编译器资源管理器上编译到我们想要的asm。至少对于int64_t;由于繁琐的CCD_ 25规则,较大的结构类型的复制效率可能较低
它使用非原子volatile T data
作为共享数据。从技术上讲,这是未定义的数据竞赛行为,但我们在实践中使用的所有编译器都可以对C++11之前的volatile
对象进行多线程访问。在C++11之前,人们甚至在一定程度上依赖原子性。我们做而不是,我们检查计数器,只有在没有并发写入的情况下才使用我们读取的值。(这就是SeqLock的全部意义。)
volatile T data
的一个问题是,在ISO C++中,T foo = data
不会为结构对象编译,除非您提供来自volatile
对象的复制构造函数,如
sixteenbyte(const volatile sixteenbyte &old)
: a(old.a), b(old.b), c(old.c), d(old.d) {}
这对我们来说真的很烦人,因为我们不在乎如何读取内存的细节,只是多个读取没有优化为一个。
volatile
在这里确实是错误的工具,而带有足够围栏的普通T data
会更好,以确保读取实际上发生在原子计数器的读取之间。例如,我们可以在GNU C中使用asm("":::"memory");
编译器来防止访问前后的重新排序。这将允许编译器使用SIMD向量或其他什么来复制更大的对象,而这在单独的volatile
访问中是做不到的。
我认为std::atomic_thread_fence(mo_acquire)
也是一个足够的屏障,但我不能100%确定。
在ISOC中,您可以复制volatile
聚合(结构),编译器将发出通常用于复制这么多字节的任何asm。但在C++中,我们显然不可能有好东西。
相关:在中断处理程序中具有写入程序的单核系统
在一个只有一个核心的嵌入式系统中,一些变量只由中断处理程序更新,您可能有一个可以中断读取器的写入程序,但不能反过来。这允许一些更便宜的变体使用值本身来检测撕裂的读取。
请参阅读取由ISR更新的64位变量,特别是对于单调计数器Brendan的建议,即先读取最高有效半部分,然后读取低半部分,再读取最高有效一半。如果它匹配,你的阅读就不会被撕裂。(一个没有改变上半部分的写入并不是问题,即使它在读取器读取之前或之后中断了读取器来改变下半部分。)
或者通常,重新读取整个值,直到连续两次看到相同的值。
这两种技术都不是SMP安全的:如果写入程序将两部分分开存储,那么读重试只会防止读操作被破坏,而不会防止写操作被破坏。这就是SeqLock使用第三个原子整数作为序列计数器的原因。它们可以在任何情况下工作,其中作者是原子的读者,但读者不是原子的。中断处理程序与主代码就是这样一种情况,或者信号处理程序是等效的。
如果你不介意增加2而不是1,你可能会使用单调计数器的低一半作为序列号。(也许需要读者进行64位右移1才能恢复实际数字。所以这不好。)