简介
我有一个小类,它利用 std::atomic 进行无锁操作。由于这个类被大量调用,它会影响性能,我遇到了麻烦。
类说明
该类类似于 LIFO,但一旦调用 pop() 函数,它只返回其环形缓冲区的最后一个写入元素(仅当自上次 pop() 以来有新元素时)。
一个线程正在调用 push(),另一个线程正在调用 pop()。
我读过的来源
由于这占用了我计算机时间的太多时间,我决定进一步研究std::atomic类及其memory_order。我已经阅读了很多memory_order StackOverflow和其他来源和书籍中可用的帖子,但我无法清楚地了解不同的模式。特别是,我在获取模式和发布模式之间挣扎:我也看不出为什么它们与memory_order_seq_cst不同。
我认为每个记忆顺序都用我的话做了什么,来自我自己的研究
memory_order_relaxed:在同一线程中,原子操作是即时的,但其他线程可能无法立即看到最新的值,它们需要一些时间才能更新。代码可以由编译器或操作系统自由重新排序。
memory_order_acquire/发布:由atomic::load使用。它可以防止在此之前的代码行被重新排序(编译器/操作系统可以在此行之后重新排序),并使用此线程或其他线程中的memory_order_release或memory_order_seq_cst读取存储在此原子上的最新值。memory_order_release还会阻止该代码在重新排序后对其进行排序。因此,在获取/发布中,操作系统可以对两者之间的所有代码进行洗牌。我不确定这是在同一线程之间还是在不同的线程之间。
memory_order_seq_cst:最容易使用,因为它就像我们使用变量的自然编写一样,立即刷新其他线程加载函数的值。
LockFreeEx 类
template<typename T>
class LockFreeEx
{
public:
void push(const T& element)
{
const int wPos = m_position.load(std::memory_order_seq_cst);
const int nextPos = getNextPos(wPos);
m_buffer[nextPos] = element;
m_position.store(nextPos, std::memory_order_seq_cst);
}
const bool pop(T& returnedElement)
{
const int wPos = m_position.exchange(-1, std::memory_order_seq_cst);
if (wPos != -1)
{
returnedElement = m_buffer[wPos];
return true;
}
else
{
return false;
}
}
private:
static constexpr int maxElements = 8;
static constexpr int getNextPos(int pos) noexcept {return (++pos == maxElements)? 0 : pos;}
std::array<T, maxElements> m_buffer;
std::atomic<int> m_position {-1};
};
我期望它如何改进
因此,我的第一个想法是在所有原子操作中使用memory_order_relaxed,因为 pop() 线程在循环中每 10-15 毫秒在 pop 函数中寻找可用的更新,然后允许在第一个 pop() 函数中失败,以便稍后意识到有新的更新。这只是一堆毫秒。
另一种选择是使用发布/获取 - 但我不确定它们。在所有store() 中使用 release 并在所有load() 函数中使用 collection。
不幸的是,我描述的所有memory_order似乎都有效,我不确定它们什么时候会失败,如果它们应该失败的话。
最后
请问,您能否告诉我,如果您在这里使用宽松的内存顺序遇到问题?或者我应该使用释放/获取(也许对这些的进一步解释可以帮助我)?为什么?
我认为放松是这个类最好的,在其所有存储()或load()中。但我不确定!
感谢您的阅读。
编辑:额外说明:
由于我看到每个人都在要求"char",所以我将其更改为int,问题解决了!但这不是我想解决的问题。
正如我之前所说,该类很可能是LIFO的东西,但只有最后一个推动的元素(如果有的话)才重要。
我有一个大结构 T(可复制和可分配),我必须以无锁的方式在两个线程之间共享。因此,我知道的唯一方法是使用循环缓冲区来写入 T 的最后一个已知值,以及一个知道写入最后一个值的索引的原子。如果没有索引,索引将为 -1。
请注意,我的推送线程必须知道何时有"新 T"可用,这就是 pop() 返回布尔值的原因。
再次感谢所有试图帮助我完成记忆订单的人! :)
阅读后解决方案:
template<typename T>
class LockFreeEx
{
public:
LockFreeEx() {}
LockFreeEx(const T& initValue): m_data(initValue) {}
// WRITE THREAD - CAN BE SLOW, WILL BE CALLED EACH 500-800ms
void publish(const T& element)
{
// I used acquire instead relaxed to makesure wPos is always the lastest w_writePos value, and nextPos calculates the right one
const int wPos = m_writePos.load(std::memory_order_acquire);
const int nextPos = (wPos + 1) % bufferMaxSize;
m_buffer[nextPos] = element;
m_writePos.store(nextPos, std::memory_order_release);
}
// READ THREAD - NEED TO BE VERY FAST - CALLED ONCE AT THE BEGGINING OF THE LOOP each 2ms
inline void update()
{
// should I change to relaxed? It doesn't matter I don't get the new value or the old one, since I will call this function again very soon, and again, and again...
const int writeIndex = m_writePos.load(std::memory_order_acquire);
// Updating only in case there is something new... T may be a heavy struct
if (m_readPos != writeIndex)
{
m_readPos = writeIndex;
m_data = m_buffer[m_readPos];
}
}
// NEED TO BE LIGHTNING FAST, CALLED MULTIPLE TIMES IN THE READ THREAD
inline const T& get() const noexcept {return m_data;}
private:
// Buffer
static constexpr int bufferMaxSize = 4;
std::array<T, bufferMaxSize> m_buffer;
std::atomic<int> m_writePos {0};
int m_readPos = 0;
// Data
T m_data;
};
内存顺序不是关于何时看到原子对象的某些特定更改,而是关于此更改可以保证周围代码的内容。松弛原子除了原子对象本身的变化之外什么也不保证:变化将是原子的。但是,不能在任何同步上下文中使用宽松的原子。
你有一些需要同步的代码。你想弹出一些被推动的东西,而不是试图弹出尚未被推动的东西。因此,如果您使用宽松的操作,则无法保证您的pop会看到此推送代码:
m_buffer[nextPos] = element;
m_position.store(nextPos, std::memory_relaxed);
正如它所写的那样。它也可以这样看:
m_position.store(nextPos, std::memory_relaxed);
m_buffer[nextPos] = element;
因此,您可以尝试从缓冲区中获取尚未存在的元素。因此,您必须使用一些同步,并且至少使用获取/释放内存顺序。
以及您的实际代码。我认为顺序可以如下:
const char wPos = m_position.load(std::memory_order_relaxed);
...
m_position.store(nextPos, std::memory_order_release);
...
const char wPos = m_position.exchange(-1, memory_order_acquire);
你的作者只需要release
,不需要seq-cst,但relaxed
太弱了。 在对相应的m_buffer[]
条目进行非原子赋值之前,无法发布m_position
的值。您需要发布排序,以确保m_position
存储仅在所有早期内存操作之后对其他线程可见。(包括非原子赋值)。 https://preshing.com/20120913/acquire-and-release-semantics/
这必须与读取器中的采集或seq_cst负载"同步"。 或者至少在读者中mo_consume
。
从理论上讲,您还需要wpos = m_position
至少acquire
(或consume
在读取器中),而不是放松,因为C++11的内存模型对于诸如值预测之类的事情足够弱,这可以让编译器在加载实际从相干缓存中获取值之前推测性地使用wPos
值。
(在实际的CPU上,一个疯狂的编译器可以使用test/branch来引入控制依赖关系,允许分支预测+推测执行来打破数据依赖性
,可能的值为wPos
。但是对于普通编译器不会这样做。 在 DEC Alpha 以外的 CPU 上,wPos = m_position
源代码中的数据依赖关系,然后使用m_buffer[wPos]
将在 asm 中创建数据依赖关系,就像mo_consume
应该利用的那样。 除 Alpha 之外的实际 ISA 保证依赖负载的依赖性排序。 (即使在 Alpha 上,使用宽松的原子交换也足以关闭少数允许这种重新排序的真正 Alpha CPU 上存在的小窗口。
当编译x86时,使用mo_acquire
根本没有缺点;它不需要任何额外的障碍。 在其他 ISA 上可能存在,例如 32 位 ARM,其中acquire
成本会形成障碍,因此在宽松负载下"作弊"可能是一种在实践中仍然安全的胜利。 当前的编译器总是加强mo_consume
mo_acquire
因此不幸的是我们无法利用它。
即使使用seq_cst
,您也已经有一个实词争用条件。
- 初始状态:
m_position = 0
- 读者通过交换
m_position = -1
来"声明"插槽 0,并读取部分m_buffer[0];
- 读者出于某种原因睡觉(例如计时器中断取消了它的时间表),或者只是与作家赛跑。
- 编写者将
wPos = m_position
读作-1
,并计算nextPos = 0
。
它会覆盖部分读取的m_buffer[0]
- 读者醒来,读完,得到一个撕裂的
T &element
。 数据竞赛 UB 在C++抽象机器中,并在实践中撕裂。
在读取后添加第二次m_position
检查(如 SeqLock)无法在每种情况下都检测到这一点,因为编写器直到写入缓冲区元素后才会更新m_position
。
即使您的实际用例在读取和写入之间有很长的间隔,但这个缺陷几乎同时发生一次读取和写入就会咬你。
我肯定知道读取端不能什么都不等,也不能停止(它是音频),它每 5-10 毫秒弹出一次,写入端是用户输入,它更慢,更快的可以每 500 毫秒推送一次。
在现代 CPU 上,毫秒是年龄。线程间延迟通常约为 60 ns,因此几分之一微秒,例如来自四核 Intel x86。 只要你不睡在互斥锁上,在放弃之前旋转重试一两次都不是问题。
代码审查:
该类类似于 LIFO,但一旦调用 pop() 函数,它只返回其环形缓冲区的最后一个写入元素(仅当自上次 pop() 以来有新元素时)。
这不是一个真正的队列或堆栈:push 和 pop 不是好名字。 "发布"和"读取"或"获取"可能会更好,并使它的用途更加明显。
我会在代码中包含注释来描述这对单个编写器、多个读者是安全的这一事实。 (push
中m_position
的非原子增量使得它对于多个编写器来说显然是不安全的。
即便如此,即使同时运行 1 个写入器 + 1 个阅读器,这也有点奇怪。 如果在写入过程中开始读取,它将获得"旧"值,而不是等待几分之一微秒来获取新值。 然后下次读取时,已经有一个新值在等待;上次错过的那个。 所以例如m_position
可以按以下顺序更新:2、-1、3。
这可能是可取的,也可能不是可取的,这取决于"过时"数据是否有任何价值,以及如果写入器在写入过程中睡觉,读者阻塞的可接受性。 甚至没有作家睡觉,等待旋转。
具有多个只读读取器的很少写入的小型数据的标准模式是 SeqLock。 例如,用于在无法原子读取或写入 128 位值的 CPU 上发布 128 位当前时间戳。 请参阅使用 32 位原子实现 64 位原子计数器
可能的设计更改
为了安全起见,我们可以让编写器自由运行,始终环绕其圆形缓冲区,并让读者跟踪它查看的最后一个元素。
如果只有一个读取器,这应该是一个简单的非原子变量。 如果它是一个实例变量,至少把它放在m_buffer[]
写位置的另一边。
// Possible failure mode: writer wraps around between reads, leaving same m_position
// single-reader
const bool read(T &elem)
{
// FIXME: big hack to get this in a separate cache line from the instance vars
// maybe instead use alignas(64) int m_lastread as a class member, and/or on the other side of m_buffer from m_position.
static int lastread = -1;
int wPos = m_position.load(std::memory_order_acquire); // or cheat with relaxed to get asm that's like "consume"
if (lastread == wPos)
return false;
elem = m_buffer[wPos];
lastread = wPos;
return true;
}
您希望lastread
与编写器编写的内容不同的缓存行中。 否则,读者对readPos的更新会变慢,因为与作者的写作错误共享,反之亦然。
这让读取器真正是只读的,即编写器写入的缓存行。 但是,在编写器写入处于"已修改"状态的行后,仍需要 MESI 流量来请求对行的读取访问权限。 但是编写器仍然可以读取m_position
而不会丢失缓存,因此它可以立即将其存储放入存储缓冲区。 它只需等待 RFO 获得缓存行的独占所有权,然后就可以将元素和更新的m_position
从其存储缓冲区提交到 L1d 缓存。
TODO:让m_position
增加而无需手动包装,因此我们有一个需要很长时间才能包装的写入序列号,避免了lastread == wPos
早期的假阴性。
使用wPos & (maxElements-1)
作为索引。 和static_assert(maxElements & (maxElements-1) == 0, "maxElements must be a power of 2");
然后,唯一的危险是在很小的时间窗口中未被发现撕裂,如果编写器一直绕行并正在写入正在读取的元素。 对于频繁读取和不频繁写入,以及不太小的缓冲区,这应该永远不会发生。 读取后再次检查m_position
(如SeqLock,类似于下面)将比赛窗口缩小到仅仍在进行的写入。
如果有多个读取器,另一个不错的选择可能是每个m_buffer
条目中的claimed
标志。 所以你会定义
template<typename T>
class WaitFreePublish
{
private:
struct {
alignas(32) T elem; // at most 2 elements per cache line
std::atomic<int8_t> claimed; // writers sets this to 0, readers try to CAS it to 1
// could be bool if we don't end up needing 3 states for anything.
// set to "1" in the constructor? or invert and call it "unclaimed"
} m_buffer[maxElements];
std::atomic<int> m_position {-1};
}
如果T
最后有填充,很遗憾我们不能利用它作为claimed
标志:/
这避免了比较位置的可能失败模式:如果写入器在读取之间绕行,我们得到的最糟糕的情况是撕裂。 我们可以通过让编写器先清除claimed
标志来检测这种撕裂,然后再编写元素的其余部分。
没有其他线程写入m_position
,我们绝对可以使用宽松的负载而不必担心。 我们甚至可以在其他地方缓存写入位置,但希望读取器不会经常使包含m_position
的缓存行无效。 显然,在您的用例中,写入器性能/延迟可能不是什么大问题。
因此,编写器 + 读取器可能如下所示,使用 SeqLock 样式的撕裂检测,使用声明的标志、元素和m_position的已知更新顺序。
/// claimed flag per array element supports concurrent readers
// thread-safety: single-writer only
// update claimed flag first, then element, then m_position.
void publish(const T& elem)
{
const int wPos = m_position.load(std::memory_order_relaxed);
const int nextPos = getNextPos(wPos);
m_buffer[nextPos].claimed.store(0, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_release); // make sure that `0` is visible *before* the non-atomic element modification
m_buffer[nextPos].elem = elem;
m_position.store(nextPos, std::memory_order_release);
}
// thread-safety: multiple readers are ok. First one to claim an entry gets it
// check claimed flag before/after to detect overwrite, like a SeqLock
const bool read(T &elem)
{
int rPos = m_position.load(std::memory_order_acquire);
int8_t claimed = m_buffer[rPos].claimed.load(std::memory_order_relaxed);
if (claimed != 0)
return false; // read-only early-out
claimed = 0;
if (!m_buffer[rPos].claimed.compare_exchange_strong(
claimed, 1, std::memory_order_acquire, std::memory_order_relaxed))
return false; // strong CAS failed: another thread claimed it
elem = m_buffer[rPos].elem;
// final check that the writer didn't step on this buffer during read, like a SeqLock
std::atomic_thread_fence(std::memory_order_acquire); // LoadLoad barrier
// We expect it to still be claimed=1 like we set with CAS
// Otherwise we raced with a writer and elem may be torn.
// optionally retry once or twice in this case because we know there's a new value waiting to be read.
return m_buffer[rPos].claimed.load(std::memory_order_relaxed) == 1;
// Note that elem can be updated even if we return false, if there was tearing. Use a temporary if that's not ok.
}
与 CAS 强相比,使用claimed = m_buffer[rPos].exchange(1)
并检查claimed==0
将是另一种选择。 也许在 x86 上效率稍高一些。 在 LL/SC 机器上,我想如果 CAS 发现与expected
不匹配,它可能能够在不进行写入的情况下进行救助,在这种情况下,只读检查毫无意义。
我使用.claimed.compare_exchange_strong(claimed, 1)
成功排序 =acquire
来确保在阅读.elem
之前阅读claimed
发生。
"失败"内存排序可以relaxed
:如果我们看到它已经被另一个线程声明,我们放弃并且不查看任何共享数据。
compare_exchange_strong
存储部分的内存排序可以relaxed
,所以我们只需要mo_acquire
,而不是acq_rel
。 读者不会对共享数据进行任何其他存储,我认为商店的排序无关紧要。到负载。 CAS 是一种原子 RMW。 只有一个线程的 CAS 可以在给定的缓冲区元素上成功,因为它们都试图将其从 0 设置为 1。 这就是原子RMW的工作方式,无论是放松还是seq_cst或介于两者之间的任何东西。
它不需要seq_cst:我们不需要刷新存储缓冲区或其他任何东西来确保存储在此线程读取.elem
之前可见。 仅仅作为一个原子 RMW 就足以阻止多个线程真正认为它们成功了。 发布只会确保它不能在宽松的只读检查之前更早移动。 这不是一个正确性问题。 希望没有 x86 编译器会在编译时这样做。 (在 x86 上的运行时,RMW 原子操作始终seq_cst。
我认为作为一个 RMW 使它不可能"踩到"作家的写作(在环绕之后)。 但这可能是实际的 CPU 实现细节,而不是 ISO C++。 在任何给定.claimed
的全局修改顺序中,我认为 RMW 保持在一起,并且"获取"顺序确实使其领先于.elem
的读取。 但是,不属于 RMW 的release
存储将是一个潜在的问题:编写器可以环绕并claimed=0
放入新条目中,然后读者的存储最终可以提交并将其设置为 1,而实际上没有读者曾经阅读过该元素。
如果我们非常确定读取器不需要检测循环缓冲区的编写器环绕,请省略编写器和读取器中的std::atomic_thread_fence
。 (声明的和非原子元素存储仍将由发布存储区排序到m_position
)。 可以简化读取器以省略第二次检查,如果它通过 CAS,则始终返回 true。
请注意,m_buffer[nextPos].claimed.store(0, std::memory_order_release);
不足以阻止以后的非原子存储出现在它之前:发布存储是一个单向屏障,与发布围栏不同。 释放围栏就像一个双向商店商店屏障。 (在x86上免费,在其他ISA上便宜。
不幸的是,这种SeqLock风格的撕裂检测在技术上并不能避免C++抽象机器中的UB。 在ISO C++中没有好的/安全的方式来表达这种模式,而且众所周知,在真实硬件上的asm中它是安全的。 实际上没有任何东西使用撕裂的值(假设read()
的调用者如果返回 false,则忽略其elem
值)。
使elem
成为std::atomic<T>
将破坏整个目的:这将使用自旋锁来获得原子性,因此它也可以直接使用它。
使用volatile T elem
会破坏buffer[i].elem = elem
因为与 C 不同,C++ 不允许将易失性结构复制到常规结构/从常规结构复制。 (易失性结构 = 结构不可能,为什么? 对于 SeqLock 类型的模式来说,这非常烦人,在这种模式中,您希望编译器发出高效的代码来复制整个对象表示形式,可以选择使用 SIMD 向量。 如果您编写一个接受volatile &T
参数并执行单个成员的构造函数或赋值运算符,则不会得到这一点。 很明显volatile
是错误的工具,这只会留下编译器内存屏障,以确保在屏障之前完全读取或完全写入非原子对象。我认为std::atomic_thread_fence
实际上是安全的,就像GNU C中的asm("" ::: "memory")
一样。 它在实践中适用于当前的编译器。