当调用Condvar::wait(Condvar, some_mutex)的线程被虚假唤醒时会发生什么?



如果线程正在睡眠,等待notify_one/notify_all调用,但在此发生之前线程被唤醒,等待调用返回还是线程继续等待?

我已经阅读了文档(对于Condvar::wait),它不清楚我是否意味着调用等待将返回一个线程一旦被唤醒:

注意,这个函数容易受到虚假唤醒的影响。条件变量通常有一个布尔谓词,每次函数返回时必须检查谓词,以防止错误的唤醒。

如果wait调用在线程被唤醒后立即返回,那么返回什么?因为它必须返回某种保护:

pub type LockResult<Guard> = Result<Guard, PoisonError<Guard>>;

具体的例子,特别是recv方法:

pub struct Receiver<T> {
shared: Arc<Shared<T>>
}
impl<T> Receiver<T> {
pub fn recv(&self) -> Option<T> {
let mut inner_mutex_guard:MutexGuard<Inner<T>> = self.shared.inner.lock().unwrap();
loop {

match inner_mutex_guard.queue.pop_front() {
Some(t) => return Some(t),
None => {
if inner_mutex_guard.senders == 0 {
return None
} else {
inner_mutex_guard = Condvar::wait(&self.shared.available, inner_mutex_guard).unwrap();
}
}
}
}
}
}

pub struct Inner<T> {
queue: VecDeque<T>,
senders: usize
}  
pub struct Shared<T> {
inner: Mutex<Inner<T>>,
available: Condvar,
}

你的例子是正确的。虽然我会写

inner_mutex_guard = self.shared.available.wait(inner_mutex_guard).unwrap();

不是

inner_mutex_guard = Condvar::wait(&self.shared.available, inner_mutex_guard).unwrap();

如果发生虚假唤醒,循环将再次调用pop_front(),这可能会返回None并进入另一个.wait()

我知道你可能被Condition variables normally have a boolean predicate associated with them弄糊涂了。那是因为你的布尔谓词有点隐藏….pop_front()是否返回Some"。当唤醒发生时,根据需要检查。

.wait()的返回类型是另一个LockGuard。为了让某些事情发生,其他人必须有可能改变锁定的值。因此,该值必须在等待时解锁。

由于在unlock-wait-lock的实现中有几个容易错过的竞争条件,因此通常在单个调用中完成。因此.wait()解锁该值,等待条件发生,然后再次锁定该值。这就是为什么它返回一个新的LockGuard…这是一个新的锁定值,并且可能被更改。

虽然说实话我不确定他们为什么那样做…他们也可以做.wait(&mut LockGuard)而不是.wait(LockGuard) -> LockGuard。但谁知道呢。

编辑:我很确定他们会返回一个锁守卫因为重新锁定可能会失败;因此它们实际上返回的不是LockGuard而是Result<LockGuard>

编辑# 2:它不能通过引用传递,因为在等待期间,没有LockGuard存在。前一个已被删除,新一个仅在数据再次锁定时存在。并且LockGuard没有"空"字。状态(如None),所以它实际上必须被消耗和丢弃。这对于引用来说是不可能的。


CondVar说明

~添加示例前的原始答案~

你必须明白CondVar更多的是一种优化。它防止了对某个条件的空闲等待。这就是为什么它需要一个Guard,它应该与一个锁定的值相结合,你想要观察变化。所以如果它被唤醒,检查值是否改变,然后回到waitwait是在循环中调用的。

下面是一个例子:

use std::{
sync::{Arc, Condvar, Mutex},
thread::{sleep, JoinHandle},
time::Duration,
};
struct Counter {
value: Mutex<u32>,
condvar: Condvar,
}
fn start_counting_thread(counter: Arc<Counter>) -> JoinHandle<()> {
std::thread::spawn(move || loop {
sleep(Duration::from_millis(100));
let mut value = counter.value.lock().unwrap();
*value += 1;
counter.condvar.notify_all();
if *value > 15 {
break;
}
})
}
fn main() {
let counter = Arc::new(Counter {
value: Mutex::new(0),
condvar: Condvar::new(),
});
let counting_thread = start_counting_thread(counter.clone());
// Wait until the value more than 10
let mut value = counter.value.lock().unwrap();
while *value <= 10 {
println!("Value is {value}, waiting ...");
value = counter.condvar.wait(value).unwrap();
}
println!("Condition met. Value is now {}.", *value);
// Unlock value
drop(value);
// Wait for counting thread to finish
counting_thread.join().unwrap();
}
Value is 0, waiting ...
Value is 1, waiting ...
Value is 2, waiting ...
Value is 3, waiting ...
Value is 4, waiting ...
Value is 5, waiting ...
Value is 6, waiting ...
Value is 7, waiting ...
Value is 8, waiting ...
Value is 9, waiting ...
Value is 10, waiting ...
Condition met. Value is now 11.

如果您不想手动实现循环,而只是等待,直到满足条件,使用wait_while:

use std::{
sync::{Arc, Condvar, Mutex},
thread::{sleep, JoinHandle},
time::Duration,
};
struct Counter {
value: Mutex<u32>,
condvar: Condvar,
}
fn start_counting_thread(counter: Arc<Counter>) -> JoinHandle<()> {
std::thread::spawn(move || loop {
sleep(Duration::from_millis(100));
let mut value = counter.value.lock().unwrap();
*value += 1;
counter.condvar.notify_all();
if *value > 15 {
break;
}
})
}
fn main() {
let counter = Arc::new(Counter {
value: Mutex::new(0),
condvar: Condvar::new(),
});
let counting_thread = start_counting_thread(counter.clone());
// Wait until the value more than 10
let mut value = counter.value.lock().unwrap();
value = counter.condvar.wait_while(value, |val| *val <= 10).unwrap();
println!("Condition met. Value is now {}.", *value);
// Unlock value
drop(value);
// Wait for counting thread to finish
counting_thread.join().unwrap();
}
Condition met. Value is now 11.

最新更新