>我最近使用三重缓冲区的 std::atomic 将端口连接到 C++11,用作并发同步机制。这种线程同步方法背后的想法是,对于生产者-消费者的情况,其中生产者的运行速度比使用者快,三重缓冲可以提供一些好处,因为生产者线程不会因为必须等待使用者而"减慢"。就我而言,我有一个以 ~120fps 更新的物理线程,以及一个以 ~60fps 运行的渲染线程。显然,我希望渲染线程始终获得可能的最新状态,但我也知道,由于速率的差异,我将从物理线程中跳过很多帧。另一方面,我希望我的物理线程保持其恒定的更新速率,而不受锁定数据的较慢渲染线程的限制。
最初的C代码是由remis-thoughts制作的,完整的解释在他的博客中。我鼓励任何有兴趣阅读它的人进一步了解原始实现。
我的实现可以在这里找到。
基本思想是拥有一个具有 3 个位置(缓冲区(的数组和一个原子标志,该标志被比较和交换以定义哪些数组元素对应于任何给定时间的状态。这样,只有一个原子变量用于对数组的所有 3 个索引和三重缓冲背后的逻辑进行建模。缓冲区的 3 个位置分别命名为"脏"、"清洁"和"捕捉"。生成者始终写入脏索引,并且可以翻转编写器以将脏索引与当前干净索引交换。使用者可以请求新的快照,该快照将当前快照索引与清理索引交换以获取最新的缓冲区。使用者始终读取对齐位置的缓冲区。
该标志由一个 8 位无符号 int 组成,位对应于:
(未使用( (新写入( (2x 脏( (2x 清洁( (2x 快照(
newWrite 额外位标志由编写器设置,并由读取器清除。读取器可以使用它来检查自上次快照以来是否有任何写入,如果没有,则不会再进行一次快照。可以使用简单的按位运算获取标志和索引。
现在好的代码:
template <typename T>
class TripleBuffer
{
public:
TripleBuffer<T>();
TripleBuffer<T>(const T& init);
// non-copyable behavior
TripleBuffer<T>(const TripleBuffer<T>&) = delete;
TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;
T snap() const; // get the current snap to read
void write(const T newT); // write a new value
bool newSnap(); // swap to the latest value, if any
void flipWriter(); // flip writer positions dirty / clean
T readLast(); // wrapper to read the last available element (newSnap + snap)
void update(T newT); // wrapper to update with a new element (write + flipWriter)
private:
bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes
// 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
// newWrite = (flags & 0x40)
// dirtyIndex = (flags & 0x30) >> 4
// cleanIndex = (flags & 0xC) >> 2
// snapIndex = (flags & 0x3)
mutable atomic_uint_fast8_t flags;
T buffer[3];
};
实现:
template <typename T>
TripleBuffer<T>::TripleBuffer(){
T dummy = T();
buffer[0] = dummy;
buffer[1] = dummy;
buffer[2] = dummy;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
TripleBuffer<T>::TripleBuffer(const T& init){
buffer[0] = init;
buffer[1] = init;
buffer[2] = init;
flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
}
template <typename T>
T TripleBuffer<T>::snap() const{
return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
}
template <typename T>
void TripleBuffer<T>::write(const T newT){
buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
}
template <typename T>
bool TripleBuffer<T>::newSnap(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do {
if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
return false;
} while(!flags.compare_exchange_weak(flagsNow,
swapSnapWithClean(flagsNow),
memory_order_release,
memory_order_consume));
return true;
}
template <typename T>
void TripleBuffer<T>::flipWriter(){
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow,
newWriteSwapCleanWithDirty(flagsNow),
memory_order_release,
memory_order_consume));
}
template <typename T>
T TripleBuffer<T>::readLast(){
newSnap(); // get most recent value
return snap(); // return it
}
template <typename T>
void TripleBuffer<T>::update(T newT){
write(newT); // write new value
flipWriter(); // change dirty/clean buffer positions for the next update
}
template <typename T>
bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
// check if the newWrite bit is 1
return ((flags & 0x40) != 0);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
// swap snap with clean
return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
}
template <typename T>
uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
// set newWrite bit to 1 and swap clean with dirty
return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
}
如您所见,我决定使用释放-消费模式进行内存排序。存储的发布 (memory_order_release( 可确保在存储之后无法对当前线程中的写入重新排序。另一方面,消耗确保当前线程中依赖于当前加载的值的读取在此加载之前无法重新排序。这可确保对释放相同原子变量的其他线程中的因变量的写入在当前线程中可见。
如果我的理解是正确的,因为我只需要原子设置标志,那么编译器可以自由地对不影响标志的其他变量进行操作,从而允许进行更多优化。通过阅读有关新内存模型的一些文档,我也意识到这些宽松的原子只会对ARM和POWER等平台产生明显的影响(它们主要是因为它们而被引入的(。由于我的目标是 ARM,我相信我可以从这些操作中受益,并能够挤出更多的性能。
现在来问问题:
对于此特定问题,我是否正确使用了释放-消费宽松的排序?
谢谢
安德烈
PS:很抱歉这篇文章很长,但我相信需要一些体面的背景来更好地了解问题。
编辑:落实@Yakk的建议:
- 修复了使用直接分配的
newSnap()
和flipWriter()
读取flags
,因此使用默认load(std::memory_order_seq_cst)
。 - 为了清晰起见,将位摆弄操作移至专用功能。
bool
返回类型添加到newSnap()
中,现在在没有新内容时返回 false,否则返回 true。- 使用惯用法将类定义为不可复制
= delete
因为如果使用TripleBuffer
,则复制和赋值构造函数都是不安全的。
编辑2:修复了不正确的描述(谢谢@Useless(。请求新快照并从快照索引读取的是使用者(而不是"编写器"(。很抱歉分心,感谢无用指出。
编辑3:根据@Display Name的建议优化了newSnap()
和flipriter()
功能,每个循环周期有效去除了2个冗余load()
。
为什么要在 CAS 循环中加载两次旧标志值?第一次是 flags.load()
,第二次是 compare_exchange_weak()
,标准在 CAS 失败时指定会将以前的值加载到第一个参数中,在本例中为 flagsNow。
根据 http://en.cppreference.com/w/cpp/atomic/atomic/compare_exchange,">否则,将存储在*this中的实际值加载到预期(执行加载操作(中。所以你的循环正在做的是在失败时,compare_exchange_weak()
重新加载flagsNow
,然后循环重复,第一个语句在加载后立即再次加载它compare_exchange_weak()
。在我看来,您的循环应该将负载拉到回路之外。例如,newSnap()
将是:
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
do
{
if( !isNewWrite(flagsNow)) return false; // nothing new, no need to swap
} while(!flags.compare_exchange_weak(flagsNow, swapSnapWithClean(flagsNow), memory_order_release, memory_order_consume));
和flipWriter()
:
uint_fast8_t flagsNow(flags.load(std::memory_order_consume));
while(!flags.compare_exchange_weak(flagsNow, newWriteSwapCleanWithDirty(flagsNow), memory_order_release, memory_order_consume));
是的,这是memory_order_acquire和memory_order_consume之间的区别,但是当您每秒使用它 180 次左右时,您不会注意到它。如果您想知道数字答案,您可以使用 m2 = memory_order_consume 运行我的测试。只需将producer_or_consumer_Thread更改为类似的东西:
TripleBuffer <int> tb;
void producer_or_consumer_Thread(void *arg)
{
struct Arg * a = (struct Arg *) arg;
bool succeeded = false;
int i = 0, k, kold = -1, kcur;
while (a->run)
{
while (a->wait) a->is_waiting = true; // busy wait
if (a->producer)
{
i++;
tb.update(i);
a->counter[0]++;
}
else
{
kcur = tb.snap();
if (kold != -1 && kcur != kold) a->counter[1]++;
succeeded = tb0.newSnap();
if (succeeded)
{
k = tb.readLast();
if (kold == -1)
kold = k;
else if (kold = k + 1)
kold = k;
else
succeeded = false;
}
if (succeeded) a->counter[0]++;
}
}
a->is_waiting = true;
}
测试结果:
_#_ __Produced __Consumed _____Total
1 39258150 19509292 58767442
2 24598892 14730385 39329277
3 10615129 10016276 20631405
4 10617349 10026637 20643986
5 10600334 9976625 20576959
6 10624009 10069984 20693993
7 10609040 10016174 20625214
8 25864915 15136263 41001178
9 39847163 19809974 59657137
10 29981232 16139823 46121055
11 10555174 9870567 20425741
12 25975381 15171559 41146940
13 24311523 14490089 38801612
14 10512252 9686540 20198792
15 10520211 9693305 20213516
16 10523458 9720930 20244388
17 10576840 9917756 20494596
18 11048180 9528808 20576988
19 11500654 9530853 21031507
20 11264789 9746040 21010829