我有一个环形缓冲区,看起来像:
template<class T>
class RingBuffer {
public:
bool Publish();
bool Consume(T& value);
bool IsEmpty(std::size_t head, std::size_t tail);
bool IsFull(std::size_t head, std::size_t tail);
private:
std::size_t Next(std::size_t slot);
std::vector<T> buffer_;
std::atomic<std::size_t> tail_{0};
std::atomic<std::size_t> head_{0};
static constexpr std::size_t kBufferSize{8};
};
此数据结构旨在使用两个线程:发布者线程和使用线程。下面列出了没有将内存顺序传递给原子的两个主要函数:
bool Publish(T value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(/* memory_order */);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail) /*, memory order */);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(/* memory order */);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head) /*, memory order */);
return true;
}
我知道,至少,我必须在Publish()
函数中使用std::memory_order::release
tail_.store()
,在Consume()
函数中tail_.load()
std::memory_order::acquire
才能在写入buffer_
和读取buffer_
之间的连接之前创建。此外,我可以std::memory_order::relaxed
传递给Publish()
tail_.load()
和Consume()
head_.load()
,因为同一线程将看到对原子的最后一次写入。现在函数是这样的:
bool Publish(T value) {
const size_t curr_head = head_.load(/* memory order */);
const size_t curr_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail), std::memory_order::release);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(std::memory_order::relaxed);
const size_t curr_tail = tail_.load(std::memory_order::acquire);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head) /*, memory order */);
return true;
}
最后一步是将内存顺序放在剩下的一对中:head_.load()
在Publish()
中,head_.store()
在Consume()
中。我必须在head_.store()
之前执行value = std::move(buffer_[curr_head]);
行Consume()
,否则在缓冲区已满的情况下我将进行数据竞争,因此,至少,我必须std::memory_order::release
传递给该存储操作以避免重新排序。但是我是否必须在Publish()
函数中head_.load()
std::memory_order::acquire
?我知道与std::memory_order::relaxed
不同,在合理的时间内查看head_.store()
会有所帮助head_.load()
,但是如果我不需要这种更短时间的保证来查看商店运营的副作用,我可以有一个放松的记忆订单吗?如果我不能,那为什么?完成的代码:
bool Publish(T value) {
const size_t curr_head = head_.load(std::memory_order::relaxed); // or acquire?
const size_t curr_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail), std::memory_order::release);
return true;
}
bool Consume(T& value) {
const size_t curr_head = head_.load(std::memory_order::relaxed);
const size_t curr_tail = tail_.load(std::memory_order::acquire);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head), std::memory_order::release);
return true;
}
每个原子的内存顺序是否正确?我关于在每个原子变量中使用每个内存顺序的解释是否正确?
以前的答案可能会有所帮助 背景:
c ++, std::atomic, 什么是 std::memory_order 以及如何使用它们?
https://bartoszmilewski.com/2008/12/01/c-atomics-and-memory-ordering/
首先,您描述的系统称为单一生产者 - 单一使用者队列。您可以随时查看此容器的提升版本进行比较。我经常会检查提升代码,即使我在不允许提升的情况下工作。这是因为检查和理解稳定的解决方案将使您深入了解可能遇到的问题(他们为什么这样做?哦,我明白了 - 等等)。 鉴于您的设计,并且编写了许多类似的容器,我会说您的设计必须小心区分空和满。如果您使用经典的 {begin,end} 对,则会遇到由于包装而导致的问题
{开始,开始+大小} == {开始,开始} == 空
好的,所以反向同步问题。
鉴于顺序仅影响重新排序,在发布中使用发布似乎是教科书式的标志使用。在容器的大小递增之前,不会读取该值,因此您不关心值本身的写入顺序是否以随机顺序发生,您只关心在增加计数之前必须完全写入该值。所以我同意,您正确地使用了发布函数中的标志.
我确实质疑消费中是否需要"发布",但是如果您要移出队列,并且这些移动是副作用,则可能需要。我想说的是,如果您追求原始速度,那么可能值得制作第二个版本,该版本专门用于琐碎的对象,使用宽松的顺序来增加头部。
您也可以考虑在推送/弹出时就地新建/删除。 虽然大多数移动会使对象处于空状态,但标准只要求它在移动后保持有效状态。 在移动后显式删除对象可能会使您以后免于晦涩的错误。
你可以争辩说,消耗的两个原子负荷可能是memory_order_consume的。这放宽了约束,可以说"我不在乎它们的加载顺序,只要它们在使用时都加载了"。尽管我在实践中怀疑它会产生任何收益。我也对这个建议感到紧张,因为当我查看增强版本时,它非常接近您拥有的版本。 https://www.boost.org/doc/libs/1_66_0/boost/lockfree/spsc_queue.hpp
template <typename Functor>
bool consume_one(Functor & functor, T * buffer, size_t max_size)
{
const size_t write_index = write_index_.load(memory_order_acquire);
const size_t read_index = read_index_.load(memory_order_relaxed);
if ( empty(write_index, read_index) )
return false;
T & object_to_consume = buffer[read_index];
functor( object_to_consume );
object_to_consume.~T();
size_t next = next_index(read_index, max_size);
read_index_.store(next, memory_order_release);
return true;
}
总的来说,虽然你的方法看起来不错,并且与提升版本非常相似。 但......如果我是你,我可能会逐行浏览增强版本,看看它有什么不同。很容易犯错误。
编辑: 抱歉,我刚刚注意到您在代码中错误地使用了 memory_order_acquire/memory_order_relaxed 标志的顺序。 您应该使最后一次写入为"发布",第一次写入为"获取"。这不会显着影响行为,但它使它更易于阅读。(在开始时同步,在结束时同步)
对评论的回复: 正如@user17732522所暗示的,复制操作也是写入操作,因此对琐碎对象的优化将不会同步,而对琐碎对象的优化将引入 U/B(该死的很容易出错)
为了理解这个问题的正确std::memory_order
,让我们考虑一下是否只有一个线程而不是生产者和使用者线程。
在单线程方案中,bool Publish(T value)
将按相同顺序查看前bool Consume(T& value)
执行的所有操作,类似地bool Consume(T& value)
将按相同顺序查看前bool Publish(T value)
执行的所有操作。
因此,在多线程方案中,发布服务器和使用者线程必须以类似的方式同步。同步可以通过memory barriers
来实现。
Consumption 函数release
原子存储head_.store(Next(curr_head), std::memory_order::release);
保证在store
之前发出的操作将在它之前执行并且不会越过它,并且发布可以通过head_
变量的原子负载来同步acquire
并且保证如果发布可以看到head_
它将看到之前执行的所有操作,
bool Publish(T value) {
const size_t curr_head = head_.load(std::memory_order::acquire);
const size_t curr_tail = tail_.load(std::memory_order::relaxed);
if (IsFull(curr_head, curr_tail)) {
return false;
}
buffer_[curr_tail] = std::move(value);
tail_.store(Next(curr_tail), std::memory_order::release);
return true;
}
类似地,Consumption 可以通过tail_
变量的原子负载与发布release
原子存储tail_.store(Next(curr_tail), std::memory_order::release);
同步acquire
如果 consumption 可以看到tail_
则保证看到在其之前执行的所有操作。
bool Consume(T& value) {
const size_t curr_tail = tail_.load(std::memory_order::acquire);
const size_t curr_head = head_.load(std::memory_order::relaxed);
if (IsEmpty(curr_head, curr_tail)) {
return false;
}
value = std::move(buffer_[curr_head]);
head_.store(Next(curr_head), std::memory_order::release);
return true;
}