有没有办法测试自己编写的C++信号量?



我读了Allen B. Downey的"信号量小书",意识到我必须先在C++中实现信号量,因为它们仅在C++20中显示为标准库的一部分。

我使用了书中的定义:

信号量类似于整数,有三个区别:

  1. 创建信号量时,可以将其值初始化为任意整数, 但在那之后,您唯一被允许执行的操作是增量 (增加一)和减少(减少一)。您无法阅读 信号量的当前值。
  2. 当线程递减信号量时,如果结果为负数,则 线程阻塞自身,在另一个线程递增之前无法继续 信号量。
  3. 当一个线程递增信号量时,如果有其他线程等待- Ing,其中一个等待线程被取消阻止。

我还使用了问题的答案C++0x没有信号量?如何同步线程?

我的实现与链接的实现有点不同,因为我在通知信令线程之前解锁了我的互斥锁,而且书中的定义也有些不同。

所以我实现了信号量,现在我意识到我不知道如何真正正确地测试它,除了最简单的情况,例如为两个线程顺序化两个调用。有没有办法测试实现,比如使用 100 个线程或类似的东西并且没有死锁?我的意思是我应该写什么测试来检查实现?或者,如果检查的唯一方法是仔细查看代码,您也许可以检查一下吗?

我的实现:

// semaphore.h
#include <condition_variable>
#include <mutex>
namespace Semaphore
{
class CountingSemaphore final
{
public:
explicit CountingSemaphore(int initialValue = 0);
void signal();
void wait();
private:
std::mutex _mutex{};
std::condition_variable _conditionVar{};
int _value;
};
} // namespace Semaphore
// semaphore.cpp
#include "semaphore.h"
namespace Semaphore
{
CountingSemaphore::CountingSemaphore(const int initialValue) : _value(initialValue) {}
void CountingSemaphore::signal()
{
std::unique_lock<std::mutex> lock{_mutex};
++_value;
if (0 == _value)
{
lock.unlock();
_conditionVar.notify_one();
}
}
void CountingSemaphore::wait()
{
std::unique_lock<std::mutex> lock{_mutex};
--_value;
if (0 > _value)
{
_conditionVar.wait(lock, [this]() { return (0 <= this->_value); });
}
}
} // namespace Semaphore

此代码已损坏。当前状态机使用负数表示服务员数量,非负数表示剩余容量(0表示没有可用性,但也没有服务员)。

问题是,您只在计数变为零时才通知服务员。因此,如果您有一个初始值为 1(基本上是逻辑互斥锁)的信号量,并且五个线程试图抓取它,一个得到它,另外四个线程等待,此时value-4。当抓住它的线程完成并发出信号时,value上升到-3,但这不是0,所以不调用notify_one

此外,您还有一些冗余代码。std::condition_variablewait的基于谓词的形式等价于:

while (!predicate()) {
wait(lock);
}

所以你的if检查是多余的(wait会在它实际wait之前检查相同的信息,甚至一次,无论如何)。

你也可以通过将增量和递减放在测试它们的同一行来压缩代码(这不是必需的,因为你使用互斥体来保护整个块,而不是依赖于原子学,但我喜欢以一种更容易移植到原子的方式编写线程代码以后,主要是为了在我编写实际原子代码时保持习惯)。修复错误并压缩代码得到以下最终结果:

void CountingSemaphore::signal()
{
std::unique_lock<std::mutex> lock{_mutex};
if (0 >= ++_value) // If we were negative, had at least one waiter, notify one of them; personally, I'd find if (value++ < 0) clearer as meaning "if value *was* less than 0, and also increment it afterwards by side-effect, but I stuck to something closer to your design to avoid confusion
{
lock.unlock();
_conditionVar.notify_one();
}
}
void CountingSemaphore::wait()
{
std::unique_lock<std::mutex> lock{_mutex};
--_value;
_conditionVar.wait(lock, [this]() { return 0 <= this->_value; });
}

另一种方法是采用一种不会将计数降至0以下的设计(因此0表示"有服务员",否则值的范围仅为0initialValue)。这在理论上更安全(你不能通过2 ** (8 * sizeof(int) - 1)服务员来触发环绕)。您可以通过使value成为ssize_t来最小化这种风险(因此 64 位系统遇到错误的可能性会呈指数级降低),或者通过将设计更改为在0处停止:

// value's declaration in header and argument passed to constructor may be changed
// to an unsigned type if you like
void CountingSemaphore::signal()
{
// Just for fun, a refactoring to use lock_guard rather than unique_lock
// since the logical unlock is unconditionally after the value read/modify
int oldvalue;  // Type should match that of value
{
std::lock_guard<std::mutex> lock{_mutex};
oldvalue = value++;
}
if (0 == oldvalue) _conditionVar.notify_one();
}
void CountingSemaphore::wait()
{
std::unique_lock<std::mutex> lock{_mutex};
// Waits only if current count is 0, returns with lock held at which point
// it's guaranteed greater than 0
_conditionVar.wait(lock, [this]() { return 0 != this->_value; });
--value;  // We only decrement when it's positive, just before we return to caller
}

第二种设计有一个小缺陷,因为它在没有可用资源的情况下调用notify_one,但没有可用资源可能意味着"有服务员",或者它可能意味着"所有资源都消耗了,但没有人等待"。不过,从逻辑上讲,这实际上不是问题;在没有服务员的情况下打电话给notify_one是合法的,什么都不做,尽管效率可能略低。在此基础上,您的原始设计可能更可取(除非肯定有服务员,否则不会notify_one)。

最新更新