我有一个用tokio
实现的TCP客户端。我有3个线程在那里,我也有2个Mutex<VecDeque>
队列(互斥是tokio::sync::Mutex
)。问题是我的锁处理不正确。当我在handle_write
方法的match
中放入一些println!
时,我只看到这个打印!输出→我在这里看到了输出,但是没有打印!match
输出:
async fn handle_write(&mut self) {
let stream = Arc::clone(&self.stream);
let session = Arc::clone(&self.session);
let output_queue = Arc::clone(&self.output_queue);
tokio::spawn(async move {
println!("I see output here");
match stream.lock().await.as_mut() {
Some(stream) => {
println!("Stream exists -> BUT no output");
let packet = match output_queue.lock().await.pop_front() {
Some(packet) => match session.lock().await.header_crypt.as_mut() {
Some(header_crypt) => header_crypt.encrypt(&packet),
_ => packet,
},
_ => vec![],
};
if !packet.is_empty() {
println!("My packet do not displayed: {:?}", &packet);
stream.write(&packet).await.unwrap();
stream.flush().await.unwrap();
}
},
_ => {
println!("No stream -> ALSO no output");
},
};
}).await.unwrap()
}
同时我在这里看到输出("Stream exists ->我看到了输出!"):
async fn handle_read(&mut self) {
let queue = Arc::clone(&self.input_queue);
let stream = Arc::clone(&self.stream);
let session = Arc::clone(&self.session);
tokio::spawn(async move {
match stream.lock().await.as_mut() {
Some(stream) => {
println!("Stream exists -> and I see output !");
let mut buffer = [0u8; 4096];
match stream.read(&mut buffer).await {
Ok(bytes_count) => {
println!("{}", &bytes_count);
let raw_data = match session.lock().await.header_crypt.as_mut() {
Some(header_crypt) => header_crypt.decrypt(&buffer[..bytes_count]),
_ => buffer[..bytes_count].to_vec(),
};
queue.lock().await.push_back(raw_data);
},
_ => {},
};
},
_ => {},
};
}).await.unwrap()
}
这个处理程序是这样调用的:
pub async fn handle_connection(&mut self) {
let packet = vec![1, 2, 3, 4, 5];
self.output_queue.lock().await.push_back(packet);
loop {
timeout(Duration::from_millis(TIMEOUT), self.handle_read()).await.unwrap();
timeout(Duration::from_millis(TIMEOUT), self.handle_queue()).await.unwrap();
timeout(Duration::from_millis(TIMEOUT), self.handle_write()).await.unwrap();
}
}
这是游乐场(但是我不明白如何允许tcp连接在那里,也许有人可以在评论中告诉?)。
为什么handle_write
的这一部分从来没有到达过?
match stream.lock().await.as_mut() {
Some(stream) => {
println!("Stream exists -> do not output");
let packet = match output_queue.lock().await.pop_front() {
Some(packet) => match session.lock().await.header_crypt.as_mut() {
Some(header_crypt) => header_crypt.encrypt(&packet),
_ => packet,
},
_ => vec![],
};
if !packet.is_empty() {
println!("My packet do not displayed: {:?}", &packet);
stream.write(&packet).await.unwrap();
stream.flush().await.unwrap();
}
},
_ => {
println!("No stream -> do not output");
},
};
你在滥用互斥锁。
技术上的原因是你有一个死锁:在handle_read()
中self.stream
的锁被获取之后,当新数据到达时,你试图锁定self.input_queue
,但它已经被handle_queue()
锁定,等待handle_read()
将数据写入那里。因此,self.stream
的锁永远不会被释放,handle_write()
也永远没有机会获得它。
背后的原因是您在不应该使用锁的地方使用了锁。不要对读/写对使用互斥锁:写线程可能永远没有机会写,因为读线程持有锁,就像这里发生的那样。您应该使用通道进行内存中的读/写,并使用流分割进行I/O对。