C++信号量(semi*lockfree*),在哪里可以得到一个



edit:这不是允许post()中互斥锁的任何问题的重复。请仔细阅读,我需要一个无锁定的帖子()!如果你没有一个真实的答案,就不要在这个副本上做标记。

信号量(像在linux中一样)是一个有用的构建块,在c++标准中找不到,在boost(目前)中也找不到。我主要讨论单个进程的线程之间的信号量,通过抢占式调度程序。

我特别感兴趣的是它们是非阻塞的(即无锁定),除非它真的需要阻塞。也就是说,post()和try_wait()应该始终是无锁定的。如果wait()调用在返回了足够多的post()之后强烈发生,那么它们应该是无锁定的。另外,阻塞等待()应该由调度程序阻塞,而不是旋转锁定。如果我还想要一个带timeout的wait_for-在避免饥饿的同时,它会使实现进一步复杂化多少?

信号量不在标准中有原因吗?

第3版:所以,我不知道标准P0514R4中有一个提案正好处理了这些问题,并且除了特别添加std::信号量之外,它还解决了这里提出的所有问题。http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2018/p0514r4.pdf

boost也没有这些。具体来说,进程间的进程是旋转锁定的。

哪些库支持这样的功能?

有可能在windowsapi和其他广泛使用的系统上实现它吗?

edit:使用atomics+mutex+condition_variable不可能实现无锁定-您必须在post中进行阻塞或在等待中进行旋转。如果您想要一个无锁定的post(),则不能在post(()中锁定互斥对象。我想在一个可能抢占的调度程序上运行,我不希望post()被其他线程阻塞,这些线程占用了互斥并被抢占。那么,这不是像C++0x这样没有信号量的问题的重复吗?如何同步线程?

第2版:下面是一个示例实现,只是为了演示使用atomics+mutex+condvar、AFAIK可以实现的最佳效果。post()和wait()执行一个无锁compare_exchange,只有在必须执行时,它们才会锁定互斥对象。

但是,post()不是无锁定的。更糟糕的是,它可能会被一个wait()阻塞,该函数锁定了互斥对象并被抢占。

为了简单起见,我只实现了post_one()和wait_one_For(Duration),而不是post(int)和wait-fo(int,Duration)。此外,我假设没有虚假的唤醒,这不是标准所承诺的。

class semaphore //provides acquire release memory ordering for the user
{
private:
using mutex_t = std::mutex;
using unique_lock_t = std::unique_lock<mutex_t>;
using condvar_t = std::condition_variable;
using counter_t = int;
std::atomic<counter_t> atomic_count_; 
mutex_t mutex_;
condvar_t condvar_;
counter_t posts_notified_pending_;
counter_t posts_unnotified_pending_;
counter_t waiters_running_;
counter_t waiters_aborted_pending_;
public:
void post_one()
{
counter_t start_count = atomic_count_.fetch_add(+1, mo_acq_rel);
if (start_count < 0) {
unique_lock_t lock(mutex_);
if (0 < waiters_running_) {
++posts_notified_pending_;
condvar_.notify_one();
}
else {
if (0 == waiters_aborted_pending_) {
++posts_unnotified_pending_;
}
else {
--waiters_aborted_pending_;
}
}
}
}
template< typename Duration >
bool wait_one_for(Duration timeout)
{
counter_t start_count = atomic_count_.fetch_add(-1, mo_acq_rel);
if (start_count <= 0) {
unique_lock_t a_lock(mutex_);
++waiters_running_;
BOOST_SCOPE_EXIT(&waiters_running_) {
--waiters_running_;
} BOOST_SCOPE_EXIT_END
if( ( 0 == posts_notified_pending_ ) && ( 0 < posts_unnotified_pending_ ) ) {
--posts_unnotified_pending_;
return true;
}
else {
auto wait_result = condvar_.wait_for( a_lock, timeout);
switch (wait_result) {
case std::cv_status::no_timeout: {
--posts_notified_pending_;
return true;
} break;
case std::cv_status::timeout: {
counter_t abort_count = atomic_count_.fetch_add(+1, mo_acq_rel);
if (abort_count >= 0) {
/*too many post() already increased a negative atomic_count_ and will try to notify, let them know we aborted. */
++waiters_aborted_pending_;
}
return false;
} break;
default: assert(false); return false;
}
}
}
return true;
}

bool try_wait_one()
{
counter_t count = atomic_count_.load( mo_acquire );
while (true) {
if (count <= 0) {
return false;
}
else if (atomic_count_.compare_exchange_weak(count, count-1, mo_acq_rel, mo_relaxed )) {
return true;
}
}
}
};

是的,只要你的操作系统提供了一个合适的"park"one_answers"unpark"机制,就可以做到这一点,而不需要为unpark取锁。Park指的是允许线程进入睡眠状态(操作系统阻塞),unpark指的是唤醒该线程。

您已经接近原子计数器和condvar方法。问题是,作为语义的一部分,condvar a mutex是所必需的。所以你必须放弃condvars,降低一点级别。首先,您应该将所有状态(如当前信号量值、是否有任何等待程序(以及可能有多少wwaiter))打包到一个原子值中,并通过比较和交换来原子化地操作它。这样可以防止在将这些值作为单独值时发生的竞争。

然后,您可以绘制一个状态图,显示信号量的所有可能状态,并为所有可能的转换状态提供边(例如,当服务员到达时,"无服务员"状态将转换为"有服务员"状态)。您通过比较和交换实现了所有转换,无论何时失败,您都必须重新计算转换,因为它可能已经更改!

那么您只需要实现阻塞。在Windows上,您可以使用事件-自动或手动重置。两者都有各自的优点和怪癖,而且有不止一种方法可以剥这只猫的皮。例如,您可能会让它与单个共享事件和自动重置事件一起工作。

然而,这里有一个机制的草图,它在无锁队列中使用每线程服务对象。信号量由一个原子操作的控制字和一个元素类型为waiter_node或堆栈的无锁列表组成,或者任何您想要使用的现成并发列表。

我们将假设每个线程都拥有一个waiter_node对象,该对象只包含一个手动重置事件对象。这可以创建一次并存储在TLS中(可能是最高效的),也可以在每次需要等待时按需分配,并在等待完成时取消分配。

以下是基本概述:

等待

  • 如果信号量可用(正),CAS将其递减并继续
  • 如果信号量不可用(零),则线程在其waiter_node上调用ResetEvent,然后将事件推送到服务列表上,检查sem值是否仍然为零,然后在其waiter_node上调用WaitForObject。当返回时,从顶部开始等待例程

发布

  • 增加控制字。弹出一个waiter_node(如果有的话),然后在上面调用SetEvent

这里有各种各样的"竞赛",例如在等待线程睡眠之前,post操作就弹出了waiter_node,但它们应该是良性的。

甚至在这种基于服务员队列的设计中也有许多变体。例如,您可以集成列表"head"和控制字,使它们是同一个东西。然后wait不需要双重检查信号量计数,因为推送操作同时验证信号量状态。您还可以实现"直接切换",在这种情况下,如果有等待程序,posting线程根本不增加控制字,而是弹出一个,并用它已经成功获取信号量的信息唤醒它。

在Linux上,可以将Event替换为futex。实现"单一futex"解决方案更容易,因为futex允许在内核内进行原子检查和块操作,从而避免了Event解决方案中固有的许多竞争。因此,一个基本的草图是一个单独的控制字,您使用CAS原子地进行转换,然后使用futex()FUTEX_WAIT对控制字和块进行第二次原子检查(这种原子检查和睡眠是futex的功能)。

相关内容

  • 没有找到相关文章

最新更新