将全局引用计数的资源与原子同步——这里放松合适吗



我有一个全局引用计数对象obj,我想通过使用原子操作来保护它免受数据竞争的影响:

T* obj;                  // initially nullptr
std::atomic<int> count;  // initially zero

我的理解是,在写入obj之后,我需要使用std::memory_order_release,这样其他线程就会知道它正在创建:

void increment()
{
if (count.load(std::memory_order_relaxed) == 0)
obj = std::make_unique<T>();
count.fetch_add(1, std::memory_order_release);
}

同样,在读取计数器时,我需要使用std::memory_order_acquire,以确保线程具有正在更改的obj的可见性:

void decrement()
{
count.fetch_sub(1, std::memory_order_relaxed);
if (count.load(std::memory_order_acquire) == 0)
obj.reset();
}

我不相信上面的代码是正确的,但我不完全确定为什么。我觉得在调用obj.reset()之后,应该有一个std::memory_order_release操作来通知其他线程。这是正确的吗?

还有其他可能出错的事情吗?或者我对这种情况下的原子操作的理解完全错误吗?

无论内存顺序如何,它都是错误的。

正如@MaartenBamelis所指出的,对于increment的并发调用,对象被构造了两次。并发decrement也是如此:对象被重置两次(这可能导致双重析构函数调用(。

请注意,T* obj;声明和将其用作unique_ptr之间存在分歧,但无论是原始指针还是唯一指针,都不能安全地进行并发修改。在实践中,resetdelete会检查指针是否为null,然后删除并将其设置为null,这些步骤不是原子步骤。

fetch_addfetch_sub是fetch和op,而不仅仅是op,这是有原因的:如果您不使用在操作过程中观察到的值,那么很可能是一场竞赛。

此代码本质上是racey。如果两个线程同时调用increment,而count最初是0,那么两个线程都会将count视为0,并且都会创建obj(并竞相查看保留了哪个副本;由于unique_ptr没有特殊的线程保护,如果其中两个线程一次设置,可能会发生可怕的事情(。

如果两个线程decrement同时(持有最后两个引用(,并且在调用load之前完成fetch_sub,则两者都将resetobj(也是坏的(。

如果一个decrement完成了fetch_sub(到0(,那么另一个线程incrementdecrementload出现之前s,increment将把count看作0并重新初始化。对象是在被替换后被清除,还是在被清除后被替换,或者两者的某种可怕的混合,将取决于incrementfetch_add是在decrementload之前还是之后运行。

简而言之:如果你发现自己对同一个变量使用了两个单独的原子操作,并测试了其中一个的结果(没有循环,就像在比较和交换循环中一样(,那你就错了。

更正确的代码看起来像:

void increment()             // Still not safe
{
// acquire is good for the != 0 case, for a later read of obj
// or would be if the other writer did a release *after* constructing an obj
if (count.fetch_add(1, std::memory_order_acquire) == 0)
obj = std::make_unique<T>();
}
void decrement()
{
if (count.fetch_sub(1, std::memory_order_acquire) == 1)
obj.reset();
}

但即便如此,它也不可靠;不能保证,当count0时,两个线程不能同时调用increment这两个fetch_add,虽然其中一个线程可以保证将count视为0,但所说的0查看线程可能最终会延迟,而将其视为1的线程则假设对象存在并在初始化之前使用它。

我不会发誓这里没有无互斥的解决方案,但处理原子论所涉及的问题几乎肯定不值得头疼。

可以将互斥锁限制在if()分支内部,但获取互斥锁也是一种原子RMW操作(对于一个好的轻量级实现来说,这并不一定有多大帮助(。如果你需要非常好的读取端扩展,你会想研究RCU之类的东西,而不是引用计数,以允许读者真正是只读的,而不是与其他读者竞争。

我真的看不到用原子实现引用计数资源的简单方法。也许有一些聪明的方法我还没有想到,但根据我的经验,聪明并不等于可读。

我的建议是首先使用互斥实现它。然后,您只需锁定互斥锁,检查引用计数,执行任何需要执行的操作,然后再次解锁。保证正确:

std::mutex mutex;
int count;
std::unique_ptr<T> obj;
void increment()
{
auto lock = std::scoped_lock{mutex};
if (++count == 1) // Am I the first reference?
obj = std::make_unique<T>();
}
void decrement()
{
auto lock = std::scoped_lock{mutex};
if (--count == 0) // Was I the last reference?
obj.reset();
}

尽管在这一点上,我只会使用std::shared_ptr,而不是自己管理引用计数:

std::mutex mutex;
std::weak_ptr<T> obj;
std::shared_ptr<T> acquire()
{
auto lock = std::scoped_lock{mutex};
auto sp = obj.lock();
if (!sp)
obj = sp = std::make_shared<T>();
return sp;
}

我相信这也可以确保在构造对象时抛出异常的安全性。

Mutex的性能令人惊讶,所以我认为锁定代码非常快,除非您有一个高度专业化的用例,需要代码无锁。

最新更新