我们发现,在代码中有几个地方,由互斥体保护的数据的并发读取相当常见,而写入则很少。我们的测量结果似乎表明,使用简单的互斥会严重阻碍代码读取数据的性能。所以我们需要的是一个多读/单写互斥锁。我知道这可以建立在更简单的基元之上,但在我尝试之前,我宁愿询问现有的知识:
用更简单的同步原语构建多读/单写锁定的认可方法是什么
我确实知道如何做到这一点,但我宁愿得到公正的答案,因为我(可能是错误的)想出了什么。(注意:我期望的是解释如何做到这一点,可能是在伪代码中,而不是一个完整的实现。我当然可以自己写代码。)
注意事项:
-
这需要有合理的性能。(我想的是,每次访问需要两次锁定/解锁操作。现在这可能不够好,但需要许多操作似乎不合理。)
-
通常,读取次数更多,但写入比读取更重要,对性能更敏感。读者决不能饿死作家。
-
我们被困在一个相当旧的嵌入式平台上(VxWorks 5.5的专有变体),有一个相当老的编译器(GCC 4.1.2)和boost 1.52——除了boost的大部分部分依赖于POSIX之外,因为POSIX并没有在该平台上完全实现。可用的锁定原语基本上是几种信号量(二进制、计数等),在此基础上我们已经创建了互斥、条件变量和监视器。
-
这是IA32,单核。
乍一看,我以为我认出了这个答案与Alexander Terekhov介绍的算法相同。但经过研究,我认为它是有缺陷的。两个写入程序可以同时等待m_exclusive_cond
。当其中一个写入程序唤醒并获得独占锁时,它将在unlock
上设置exclusive_waiting_blocked = false
,从而将互斥锁设置为不一致状态。在那之后,互斥体很可能被冲洗掉。
N2406,其首先提出std::shared_mutex
包含部分实现,下面用更新的语法重复该部分实现。
class shared_mutex
{
mutex mut_;
condition_variable gate1_;
condition_variable gate2_;
unsigned state_;
static const unsigned write_entered_ = 1U << (sizeof(unsigned)*CHAR_BIT - 1);
static const unsigned n_readers_ = ~write_entered_;
public:
shared_mutex() : state_(0) {}
// Exclusive ownership
void lock();
bool try_lock();
void unlock();
// Shared ownership
void lock_shared();
bool try_lock_shared();
void unlock_shared();
};
// Exclusive ownership
void
shared_mutex::lock()
{
unique_lock<mutex> lk(mut_);
while (state_ & write_entered_)
gate1_.wait(lk);
state_ |= write_entered_;
while (state_ & n_readers_)
gate2_.wait(lk);
}
bool
shared_mutex::try_lock()
{
unique_lock<mutex> lk(mut_, try_to_lock);
if (lk.owns_lock() && state_ == 0)
{
state_ = write_entered_;
return true;
}
return false;
}
void
shared_mutex::unlock()
{
{
lock_guard<mutex> _(mut_);
state_ = 0;
}
gate1_.notify_all();
}
// Shared ownership
void
shared_mutex::lock_shared()
{
unique_lock<mutex> lk(mut_);
while ((state_ & write_entered_) || (state_ & n_readers_) == n_readers_)
gate1_.wait(lk);
unsigned num_readers = (state_ & n_readers_) + 1;
state_ &= ~n_readers_;
state_ |= num_readers;
}
bool
shared_mutex::try_lock_shared()
{
unique_lock<mutex> lk(mut_, try_to_lock);
unsigned num_readers = state_ & n_readers_;
if (lk.owns_lock() && !(state_ & write_entered_) && num_readers != n_readers_)
{
++num_readers;
state_ &= ~n_readers_;
state_ |= num_readers;
return true;
}
return false;
}
void
shared_mutex::unlock_shared()
{
lock_guard<mutex> _(mut_);
unsigned num_readers = (state_ & n_readers_) - 1;
state_ &= ~n_readers_;
state_ |= num_readers;
if (state_ & write_entered_)
{
if (num_readers == 0)
gate2_.notify_one();
}
else
{
if (num_readers == n_readers_ - 1)
gate1_.notify_one();
}
}
该算法源自Alexander Terekhov的一篇旧新闻组帖子。它既不让读者也不让作家挨饿。
有两个"门",gate1_
和gate2_
。读写器必须通过gate1_
,并且在尝试这样做时可能会被阻止。一旦读写器通过gate1_
,它就读取锁定了互斥体。读者可以通过gate1_
,只要拥有所有权的读者数量不超过上限,只要作者没有通过gate1_
。
一次只能有一个写入程序通过gate1_
。即使读者拥有所有权,作家也可以通过gate1_
。但一旦过了gate1_
,作家仍然没有所有权。它必须首先通过gate2_
。在所有拥有所有权的读者都放弃gate2_
之前,写入程序无法通过CCD_15。回想一下,当写入程序在gate2_
等待时,新读者无法通过CCD16。当一个作家在gate2_
等待时,一个新的作家也不能通过gate1_
。
读者和作者在gate1_
上都被阻止,并且为了通过它而施加了(几乎)相同的要求,这一特点使该算法对读者和作者都公平,两者都不挨饿。
互斥"状态"有意保留在一个单词中,以表明对某些状态更改部分使用原子(作为优化)是可能的(即,对于未受控制的"快速路径")。然而,这里没有演示这种优化。一个例子是,如果编写器线程可以原子地将state_
从0更改为write_entered
,则他获得锁定而不必阻止甚至锁定/解锁mut_
。CCD_ 24可以用原子存储来实现。等等。这里没有显示这些优化,因为它们比这个简单的描述听起来更难正确实现。
似乎只有mutex和condition_variable作为同步原语。因此,我在这里写了一个读者-作家锁,它让读者感到饥饿。它使用一个互斥、两个conditionalvariable和三个integer。
readers - readers in the cv readerQ plus the reading reader
writers - writers in cv writerQ plus the writing writer
active_writers - the writer currently writing. can only be 1 or 0.
它以这种方式饿死了读者。如果有几个作家想写,那么在所有作家写完之前,读者永远不会有机会阅读。这是因为以后的读者需要检查writers
变量。同时,active_writers
变量将保证一次只能有一个写入程序进行写入。
class RWLock {
public:
RWLock()
: shared()
, readerQ(), writerQ()
, active_readers(0), waiting_writers(0), active_writers(0)
{}
void ReadLock() {
std::unique_lock<std::mutex> lk(shared);
while( waiting_writers != 0 )
readerQ.wait(lk);
++active_readers;
lk.unlock();
}
void ReadUnlock() {
std::unique_lock<std::mutex> lk(shared);
--active_readers;
lk.unlock();
writerQ.notify_one();
}
void WriteLock() {
std::unique_lock<std::mutex> lk(shared);
++waiting_writers;
while( active_readers != 0 || active_writers != 0 )
writerQ.wait(lk);
++active_writers;
lk.unlock();
}
void WriteUnlock() {
std::unique_lock<std::mutex> lk(shared);
--waiting_writers;
--active_writers;
if(waiting_writers > 0)
writerQ.notify_one();
else
readerQ.notify_all();
lk.unlock();
}
private:
std::mutex shared;
std::condition_variable readerQ;
std::condition_variable writerQ;
int active_readers;
int waiting_writers;
int active_writers;
};
由互斥体保护的数据的并发读取相当常见,而写入则很少
这听起来像是用户空间RCU:的理想场景
URCU与Linux内核类似,提供了读写器锁定的替代品,以及其他用途。这种相似性继续存在于读卡器不直接与RCU更新程序同步,从而使RCU读取端代码路径非常快,同时允许RCU读卡器即使在与RCU升级程序同时运行时也能取得有用的正向进展,反之亦然。
您可以使用一些好技巧来提供帮助。
首先,良好的性能。VxWorks以其非常好的上下文切换时间而闻名。无论你使用什么锁定解决方案,它都可能涉及信号量。我不会害怕使用信号量(复数),它们在VxWorks中得到了很好的优化,快速的上下文切换时间有助于最小化评估许多信号量状态等导致的性能下降。
此外,我会忘记使用POSIX信号量,它只是将被分层在VxWork自己的信号量之上。VxWorks提供本机计数、二进制和互斥信号量;用一个合适的会让一切都快一点。二进制一有时可能非常有用;发布到多次,从不超过值1。
其次,写入比读取更重要。当我在VxWorks中遇到这种需求并使用信号量来控制访问时,我使用任务优先级来指示哪个任务更重要,应该首先访问资源。这相当有效;实际上,VxWorks中的一切都是一项任务(好吧,线程),就像其他任何任务一样,包括所有的设备驱动程序等。
VxWorks还解决优先级倒置(Linus Torvalds讨厌这种事情)。因此,如果您使用信号量实现锁定,那么如果优先级较低的读卡器阻塞了优先级较高的写入程序,您可以依靠操作系统调度程序来启动它们。它可以产生更简单的代码,而且你也可以充分利用操作系统。
因此,一个潜在的解决方案是使用一个VxWorks计数信号量来保护资源,初始化为等于读卡器数量的值。每次读卡器想要读取时,它都会获取信号量(将计数减少1)。每次读取完成后,它都会发布信号量,使计数增加1。每次编写器想要编写时,它都会获取信号量n次(n=读取器数量),并在完成后发布n次。最后使编写器任务的优先级高于任何阅读器,并依赖于操作系统的快速上下文切换时间和优先级反转。
请记住,您是在硬实时操作系统上编程,而不是在Linux上编程。获取/发布本机VxWorks信号量所需的运行时间与Linux上的类似操作不一样,尽管现在即使是Linux也很好(我现在使用PREEMPT_RT)。VxWorks调度程序和所有设备驱动程序都可以依赖于它们的行为。如果你愿意,你甚至可以让你的编写器任务成为整个系统中的最高优先级,甚至高于所有设备驱动程序!
为了帮助事情向前发展,还要考虑每个线程正在做什么。VxWorks允许您指示任务是否正在使用FPU。如果您使用的是本机VxWorks TaskSpawn例程而不是pthread_create,那么您就有机会指定它。这意味着,如果你的线程/任务没有进行任何浮点运算,并且你在对TaskSpawn的调用中也说过,那么上下文切换时间会更快,因为调度器不会费心保留/恢复FPU状态。
这很有可能成为您正在开发的平台上的最佳解决方案。它发挥了操作系统的优势(快速信号量、快速上下文切换时间),而不引入大量额外代码来重新创建其他平台上常见的替代(可能更优雅)解决方案。
第三,使用旧的GCC和旧的Boost。基本上,除了给WindRiver打电话讨论购买升级版的低价值建议外,我无能为力。就我个人而言,当我为VxWorks编程时,我使用了VxWork的原生API,而不是POSIX。好吧,所以代码不是很便携,但它最终很快;无论如何,POSIX只是本机API之上的一层,这总是会减慢速度。
也就是说,POSIX计数和互斥信号量与VxWork的本机计数和互斥量信号量非常相似。这可能意味着POSIX分层不是很厚。
关于VxWorks编程的一般说明
调试我一直试图使用Solaris可用的开发工具(Tornado)。这是迄今为止我遇到过的最好的多线程调试环境。为什么?它允许您启动多个调试会话,系统中的每个线程/任务都有一个调试会话。最终,每个线程都会有一个调试窗口,并且您将分别独立地对每个线程进行调试。跨过一个阻塞操作,调试窗口就会被阻塞。将鼠标焦点移到另一个调试窗口,跳过将释放块的操作,并观察第一个窗口完成其步骤。
您最终会得到很多调试窗口,但这是迄今为止调试多线程内容的最佳方式。它使得写非常复杂的东西和看到问题变得非常容易。您可以轻松地探索应用程序中不同的动态交互,因为您可以简单而强大地控制每个线程在任何时候都在做什么。
具有讽刺意味的是,Windows版的"龙卷风"并没有让你这么做;每个系统只有一个糟糕的调试窗口,就像Visual Studio等任何其他无聊的旧IDE一样。我从未见过即使是现代IDE在多线程调试方面也能像Solaris上的Tornado那样出色。
硬盘如果您的读写器正在使用磁盘上的文件,请考虑VxWorks 5.5相当旧。像NCQ这样的东西不会得到支持。在这种情况下,我提出的解决方案(如上所述)最好使用单个互斥信号量,以防止多个读卡器在读取磁盘的不同部分时相互绊倒。这取决于你的读卡器到底在做什么,但如果他们从文件中读取连续的数据,这将避免读/写头在磁盘表面来回摆动(非常慢)。
在我的案例中,我使用这个技巧来塑造网络接口上的流量;每个任务都发送不同种类的数据,任务优先级反映了网络上数据的优先级。它非常优雅,没有任何消息是零散的,但重要的消息获得了可用带宽的最大份额。
一如既往,最佳解决方案取决于细节。读写旋转锁可能是你想要的,但其他方法,如上面建议的读拷贝更新,可能是一个解决方案——尽管在旧的嵌入式平台上,使用的额外内存可能是个问题。对于罕见的写入,我经常使用任务系统来安排工作,这样只有在没有从该数据结构读取时才能进行写入,但这取决于算法。
在读写器并发控制中描述了一种基于信号量和互斥的算法;P.J.Courtois、F.Heymans和D.L.Parnas<MBLE研究实验室;比利时布鲁塞尔>。
这是一个基于Boost标头的简化答案(我会以批准的方式调用Boost)。它只需要条件变量和互斥。我使用Windows原语重写了它,因为我发现它们是描述性的,而且非常简单,但我认为这是伪代码。
这是一个非常简单的解决方案,不支持互斥升级或try_lock()操作。如果你愿意,我可以添加这些。我还去掉了一些不太必要的修饰,比如禁用中断。
此外,值得一看boostthreadpthreadshared_mutex.hpp
(这是基于此)。它是人类可读的。
class SharedMutex {
CRITICAL_SECTION m_state_mutex;
CONDITION_VARIABLE m_shared_cond;
CONDITION_VARIABLE m_exclusive_cond;
size_t shared_count;
bool exclusive;
// This causes write blocks to prevent further read blocks
bool exclusive_wait_blocked;
SharedMutex() : shared_count(0), exclusive(false)
{
InitializeConditionVariable (m_shared_cond);
InitializeConditionVariable (m_exclusive_cond);
InitializeCriticalSection (m_state_mutex);
}
~SharedMutex()
{
DeleteCriticalSection (&m_state_mutex);
DeleteConditionVariable (&m_exclusive_cond);
DeleteConditionVariable (&m_shared_cond);
}
// Write lock
void lock(void)
{
EnterCriticalSection (&m_state_mutex);
while (shared_count > 0 || exclusive)
{
exclusive_waiting_blocked = true;
SleepConditionVariableCS (&m_exclusive_cond, &m_state_mutex, INFINITE)
}
// This thread now 'owns' the mutex
exclusive = true;
LeaveCriticalSection (&m_state_mutex);
}
void unlock(void)
{
EnterCriticalSection (&m_state_mutex);
exclusive = false;
exclusive_waiting_blocked = false;
LeaveCriticalSection (&m_state_mutex);
WakeConditionVariable (&m_exclusive_cond);
WakeAllConditionVariable (&m_shared_cond);
}
// Read lock
void lock_shared(void)
{
EnterCriticalSection (&m_state_mutex);
while (exclusive || exclusive_waiting_blocked)
{
SleepConditionVariableCS (&m_shared_cond, m_state_mutex, INFINITE);
}
++shared_count;
LeaveCriticalSection (&m_state_mutex);
}
void unlock_shared(void)
{
EnterCriticalSection (&m_state_mutex);
--shared_count;
if (shared_count == 0)
{
exclusive_waiting_blocked = false;
LeaveCriticalSection (&m_state_mutex);
WakeConditionVariable (&m_exclusive_cond);
WakeAllConditionVariable (&m_shared_cond);
}
else
{
LeaveCriticalSection (&m_state_mutex);
}
}
};
行为
好吧,这个算法的行为有些混乱,下面是它的工作原理。
写入锁定期间-读卡器和写入器都被阻止。
在写锁定结束时-读线程和一个写线程将竞争以查看哪个线程开始。
在读取锁定期间-写入程序被阻止。读卡器也被阻止当且仅当写入程序被阻止时。
在最终读取锁定发布时-读取器线程和一个编写器线程将竞争以查看哪一个线程启动。
如果处理器在通知期间频繁地将上下文切换到m_exclusive_cond
之前的m_shared_cond
线程,则可能会导致读者饿死编写器,但我怀疑这个问题是理论上的,而不是实际的,因为它是Boost的算法。
现在Microsoft已经开放了.NET源代码,您可以查看他们的ReaderWRiterLockSlim实现。
我不确定它们使用的更基本的基元是否可用,其中一些基元也是.NET库的一部分,它们的代码也可用。
微软已经花了很多时间来提高其锁定机制的性能,所以这可能是一个很好的起点。