我需要让实时(音频)线程在事件发生时向另一个后台线程发出信号。然后,后台"检查器"线程将执行一些昂贵的操作。
因此,我的限制是:
-
通知操作必须是无锁的(等待端没有此类限制)
-
防止检查器线程浪费 CPU 时间,并在不需要时将其正常置于睡眠状态,因为事件之间可能相隔长达几分钟。
- 代码必须是可移植的(osx,windows,android,ios)。
我想出了一个使用条件变量和条件标志的简单解决方案。需要该标志来防止检查器线程在检查器线程开始等待之前发出条件变量信号时等待(可能永远等待)。
struct EventChecker {
std::mutex mtx;
std::condition_variable cv;
std::atomic<bool> flag;
bool alive;
std::thread checkerThread;
EventChecker() {
alive = true;
checkerThread = std::thread([this]() {
while (alive) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this]() { return flag.exchange(false); });
std::cout << " action! ";
}
});
}
void notify() {
flag.store(true);
cv.notify_one();
}
~EventChecker() {
alive = false;
notify();
checkerThread.join();
}
};
如果在标志检查和等待的实际开始之间发生notify()
,此解决方案将死锁(准确地说,检查器线程未唤醒)。如果您认为有条件的wait
的实现只是:
while (!predicate())
wait(lock);
通过这个简单的测试,很容易得到死锁(在Macbook上复制):
int main() {
for (int i = 0; i < 10000; i++) {
EventChecker ec;
}
return 0;
}
据我了解,实际上没有办法在不将mtx
锁定在通知中的情况下使 check+wait 操作原子化,并在等待端添加解锁以减少保留时间,如下所示:
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this]() { return flag.exchange(false); });
// NB: wait() atomically releases lock and blocks the thread
mtx.unlock();
我错过了什么吗?我是否滥用了std::condition_variable
,还有其他更适合我的目的?
对于没有锁的原子检查和等待,您可以使用futex机制。不幸的是,此机制不是任何C/C++标准的一部分,它是低级操作系统支持的一部分:
- Linux(和Android)也有futex系统调用。
- Windows具有WaitOnAddress功能。
我还没有在OSX上找到任何futex的类似物,也不知道它在ios上。
我实现了几个可能的解决方案,需要接受一些根据情况可以接受的妥协,并在周末做了一些测量,因为我喜欢玩得开心。
我重写了EventChecker
,以使用Event
类的三种可能实现:
Event_WithStdMutex
:使用std::mutex
的引用"规范"实现Event_WithLightLock
:它仍然使用互斥锁保护条件,但使用轻量级(准确地说是自旋锁)。这里的想法是,当事件发生相对罕见时,当通知发生时,检查器线程几乎总是已经处于等待状态,因此通知线程获取锁几乎没有开销。Event_WithTimeout
:通知不受锁保护,但我们使用wait_for
让检查器线程在最坏的情况下唤醒(如问题中所述)。当我们需要不惜一切代价快速通知并且我们可以承受以等于最坏情况下wait_for
超时的延迟处理事件时,将使用此方法。超时时间选择是在最坏情况下的响应能力和 CPU 时间节省之间进行权衡。
以下是实现:
struct Event_WithStdMutex {
std::condition_variable_any cv;
std::atomic<bool> condition;
std::mutex mtx;
Event_WithStdMutex() : condition(false) {}
void wait() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [this]() { return condition.exchange(false); });
}
void notify() {
std::unique_lock<std::mutex> lock(mtx);
condition.store(true);
cv.notify_one();
}
};
struct LightMutex {
std::atomic_flag flag = ATOMIC_FLAG_INIT;
void lock() { while (flag.test_and_set(std::memory_order_acquire)); }
void unlock() { flag.clear(std::memory_order_release); }
};
struct Event_WithLightLock {
std::condition_variable_any cv;
std::atomic<bool> condition;
LightMutex mtx;
Event_WithLightLock() : condition(false) {}
void wait() {
std::unique_lock<LightMutex> lock(mtx);
cv.wait(lock, [this]() { return condition.exchange(false); });
}
void notify() {
std::unique_lock<LightMutex> lock(mtx);
condition.store(true);
cv.notify_one();
}
};
struct Event_WithTimeout {
std::condition_variable cv;
std::atomic<bool> condition;
std::mutex mtx;
std::chrono::milliseconds timeout;
Event_WithTimeout() : condition(false), timeout(10) {}
void wait() {
std::unique_lock<std::mutex> lock(mtx);
cv.wait_for(lock, timeout, [this]() { return condition.exchange(false); });
}
void notify() {
condition.store(true);
cv.notify_one();
}
};
和事件检查器(注意 1 微秒睡眠以"模拟"某种操作):
template <typename Event> struct EventChecker {
bool alive;
std::thread checkerThread;
Event event;
EventChecker() {
alive = true;
checkerThread = std::thread([this]() {
while (alive) {
event.wait();
std::this_thread::sleep_for(std::chrono::microseconds(1)); // comment this for more fun
}
});
}
void notify() {
event.notify();
}
~EventChecker() {
alive = false;
notify();
checkerThread.join();
}
};
以下是我用于测量的函数。notify()
的持续时间既在EventChecker
上下文中测量(线程等待事件),也仅使用 Event(测量没有人等待条件变量的时间):
const int N = 1000000;
template <typename Event> void measureNotify(std::string eventType) {
EventChecker<Event> evChecker;
auto begin = std::chrono::high_resolution_clock::now();
for (int i = 0; i < N; i++){
evChecker.notify();
}
auto dur = std::chrono::high_resolution_clock::now() - begin;
std::cout << "EventChecker (with " << eventType << ") avg notify time: "
<< std::chrono::duration<double, std::nano>(dur).count() / N << " ns n";
Event ev;
begin = std::chrono::high_resolution_clock::now();
for (int i = 0; i < N; i++) {
ev.notify();
}
dur = std::chrono::high_resolution_clock::now() - begin;
std::cout << eventType << " avg notify time (no-one waiting): "
<< std::chrono::duration<double, std::nano>(dur).count() / N << " ns nn";
}
还有测量std::condition_variable
的notify_one()
所花费的时间,没有人等待:
void measureNotifyConditionVariable()
{
std::condition_variable cv;
auto begin = std::chrono::high_resolution_clock::now();
for (int i = 0; i < N; i++){
cv.notify_one();
}
auto dur = std::chrono::high_resolution_clock::now() - begin;
std::cout << "std::condition_variable avg notify time (no-one waiting): "
<< std::chrono::duration<double, std::nano>(dur).count() / N << " ns n";
}
在Macbook Pro(2.2GHz)上运行测试会得到以下结果:
事件检查器(带 Event_WithStdMutex)平均通知时间:157.522 ns
Event_WithStdMutex 平均通知时间(无人等待):67.3699 ns
事件检查器(带 Event_WithLightLock)平均通知时间:67.1347 ns
Event_WithLightLock 平均通知时间(无人等待):61.0349 ns
事件检查器(带 Event_WithTimeout)平均通知时间:23.5722 ns
Event_WithTimeout 平均通知时间(无人等待):22.3806 ns
标准::condition_variable 平均通知时间(无人等待):13.6012 ns
有趣的是,这是删除 1 微秒睡眠时的输出:
事件检查器(带 Event_WithStdMutex)平均通知时间:7346.42 ns
Event_WithStdMutex 平均通知时间(无人等待):66.202 ns
事件检查器(带 Event_WithLightLock)平均通知时间:337.239 ns
Event_WithLightLock平均通知时间(无人等待):61.8729 ns
事件检查器(带 Event_WithTimeout)平均通知时间:46.7398 ns
Event_WithTimeout平均通知时间(无人等待):22.2315 ns
标准::condition_variable 平均通知时间(无人等待):13.3488 ns
当有人等待条件变量时,平均notify()
持续时间会变得更高(在std::mutex
情况下非常高!
我的理解是,发生这种情况是因为通知线程实际上更有可能必须与等待线程争夺锁。