Rust:安全地终止ThreadPool内正在运行的线程



我遵循了Rust的官方文档,该文档向您展示了如何编写多线程HTTP服务器。他们会引导您构建一个由单个工作者支持的线程池,每个工作者都有自己的线程。线程池将执行Job实例,并通过MPSC实现将它们传递给工作线程。

根据文档,这就是代码的样子,完整的指南在这里。

struct Worker {
id: usize,
thread: thread::JoinHandle<()>
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let job = receiver.lock().unwrap().recv().unwrap();
println!("Receiver {} received a job.", id);
job();
});
Worker { id, thread }
}
fn get_thread(&self) -> &thread::JoinHandle<()> {
return &self.thread;
}
}

我想在Worker实现中添加一个附加函数:shutdown(),即

fn shutdown(&mut self) {
&self.thread.join();
}

我会从主线程调用它,从而在程序结束执行之前等待所有线程完成。然而,上面的关闭功能给了我这个错误:

error: src/main.rs:31: cannot move out of `self.thread` which is behind a mutable reference
error: src/main.rs:31: move occurs because `self.thread` has type `JoinHandle<()>`, which does not implement the `Copy` trait

我宣布自己是&mut self,因为我将是突变的self.thread。我忽略了什么?

认为它告诉我,通过调用&self.thread.join(),我将销毁属于该工作实例的thread::JoinHandle<()>,而我不被允许这样做?那我该如何杀死这根线呢?

其次,我认识到我的线程有一个无限循环,因此从技术上讲,在这个线程上调用join只会导致主线程挂起。我该怎么把线弄断?将共享引用传递给我的Worker,并让Worker检查其值,从而脱离循环?

首先,我猜您有一个ThreadPool结构。您需要为该结构实现Drop特性,以便在整个ThreadPool被丢弃时完成所有线程的连接,而实际上不能在每个工作线程的基础上这样做。

impl Drop for ThreadPool {
fn drop(&mut self) {
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}

take()调用在这里很重要,因为我们想去掉该选项的Some变体。如果线程已经是None,则不需要调用join。

在任何语言中关闭线程池都需要一些中心信号得到所有线程的确认,而不仅仅是一个线程。如果这是C++,您可能会在线程的工作函数顶部检查一个原子布尔,如果它是真的,则返回。虽然你可以在rust中做到这一点,但我认为你想遵循一种更接近书中介绍的项目的方法,即消息。

您希望在通道中使用消息类型,而不是作业,这样您就有两种变体,一种是作业消息,另一种是终止消息。

type Job = Box<dyn FnOnce() + Send + 'static>;
enum Message {
NewJob(Job),
Terminate,
}

你的ThreadPool将是

pub struct ThreadPool {
workers: Vec<Worker>,
sender: mpsc::Sender<Message>,
}

然后,你的worker函数看起来更像这样,类似于这本书的这一部分

impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Message>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv().unwrap();
match message {
Message::NewJob(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Message::Terminate => {
println!("Worker {} was told to terminate.", id);
break;
}
}
});

现在,您还提到您的程序挂起,无法从外部关闭它。使用信号处理机箱可能比一开始自己实现更容易。一种选择是singal_hook板条箱:

use signal_hook::{iterator::Signals, SIGINT};
use std::{error::Error, thread, time::Duration};
fn main() -> Result<(), Box<dyn Error>> {
let signals = Signals::new(&[SIGINT])?;
thread::spawn(move || {
for sig in signals.forever() {
drop(pool);
}
});
Ok(())
}

当你在键盘上进行ctrl+c操作时,这将终止程序

最新更新