当从线程使用互斥锁时,互斥锁从未达到匹配下



我有一个用tokio实现的TCP客户端。我有3个线程在那里,我也有2个Mutex<VecDeque>队列(互斥是tokio::sync::Mutex)。问题是我的锁处理不正确。当我在handle_write方法的match中放入一些println!时,我只看到这个打印!输出→我在这里看到了输出,但是没有打印!match输出:

async fn handle_write(&mut self) {
let stream = Arc::clone(&self.stream);
let session = Arc::clone(&self.session);
let output_queue = Arc::clone(&self.output_queue);
tokio::spawn(async move {
println!("I see output here");
match stream.lock().await.as_mut() {
Some(stream) => {
println!("Stream exists -> BUT no output");
let packet = match output_queue.lock().await.pop_front() {
Some(packet) => match session.lock().await.header_crypt.as_mut() {
Some(header_crypt) => header_crypt.encrypt(&packet),
_ => packet,
},
_ => vec![],
};
if !packet.is_empty() {
println!("My packet do not displayed: {:?}", &packet);
stream.write(&packet).await.unwrap();
stream.flush().await.unwrap();
}
},
_ => {
println!("No stream -> ALSO no output");
},
};
}).await.unwrap()
}

同时我在这里看到输出("Stream exists ->我看到了输出!"):

async fn handle_read(&mut self) {
let queue = Arc::clone(&self.input_queue);
let stream = Arc::clone(&self.stream);
let session = Arc::clone(&self.session);
tokio::spawn(async move {
match stream.lock().await.as_mut() {
Some(stream) => {
println!("Stream exists -> and I see output !");
let mut buffer = [0u8; 4096];
match stream.read(&mut buffer).await {
Ok(bytes_count) => {
println!("{}", &bytes_count);
let raw_data = match session.lock().await.header_crypt.as_mut() {
Some(header_crypt) => header_crypt.decrypt(&buffer[..bytes_count]),
_ => buffer[..bytes_count].to_vec(),
};
queue.lock().await.push_back(raw_data);
},
_ => {},
};
},
_ => {},
};
}).await.unwrap()
}

这个处理程序是这样调用的:

pub async fn handle_connection(&mut self) {
let packet = vec![1, 2, 3, 4, 5];
self.output_queue.lock().await.push_back(packet);
loop {
timeout(Duration::from_millis(TIMEOUT), self.handle_read()).await.unwrap();
timeout(Duration::from_millis(TIMEOUT), self.handle_queue()).await.unwrap();
timeout(Duration::from_millis(TIMEOUT), self.handle_write()).await.unwrap();
}
}

这是游乐场(但是我不明白如何允许tcp连接在那里,也许有人可以在评论中告诉?)。

为什么handle_write的这一部分从来没有到达过?

match stream.lock().await.as_mut() {
Some(stream) => {
println!("Stream exists -> do not output");
let packet = match output_queue.lock().await.pop_front() {
Some(packet) => match session.lock().await.header_crypt.as_mut() {
Some(header_crypt) => header_crypt.encrypt(&packet),
_ => packet,
},
_ => vec![],
};
if !packet.is_empty() {
println!("My packet do not displayed: {:?}", &packet);
stream.write(&packet).await.unwrap();
stream.flush().await.unwrap();
}
},
_ => {
println!("No stream -> do not output");
},
};

你在滥用互斥锁。

技术上的原因是你有一个死锁:在handle_read()self.stream的锁被获取之后,当新数据到达时,你试图锁定self.input_queue,但它已经被handle_queue()锁定,等待handle_read()将数据写入那里。因此,self.stream的锁永远不会被释放,handle_write()也永远没有机会获得它。

背后的原因是您在不应该使用锁的地方使用了锁。不要对读/写对使用互斥锁:写线程可能永远没有机会写,因为读线程持有锁,就像这里发生的那样。您应该使用通道进行内存中的读/写,并使用流分割进行I/O对。

最新更新