Rust中为线程或函数创建超时的正确方法是什么



这是我的代码:

use std::net;
use std::thread;
fn scan_port(host: &str, port: u16) -> bool {
    let host = host.to_string();
    let port = port;
    let t = thread::spawn(move || net::TcpStream::connect((host.as_str(), port)).is_ok());
    t.join().unwrap()
}

如果连接没有在N秒内完成,我如何创建线程将被终止或终止的情况?

这一切的原因是Rust没有办法设置套接字连接超时,所以我没有办法确保程序不会被卡住。

正如@Shepmaster所指出的:终止线程是个坏主意。

相反,您可以给线程一个Sender,如果它成功地打开了连接,它应该通过它来通知您(甚至可以通过向您发送句柄)。然后,您可以让您的主线程sleep等待您想要等待的时间。当您的线程唤醒时,它会检查相应的Receiver,以从线程中寻找生命迹象。如果线程没有响应,只需通过丢弃JoinHandleReceiver将其释放到野外即可。它并没有消耗cpu时间(它被阻塞了),也没有消耗太多内存。如果它解除锁定,它将检测到Sender未连接,并且可以永远关闭。

当然,你不应该有大量这样的开放线程,因为它们仍然使用资源(内存和系统线程句柄),但在一个正常的系统上,这并不是太大的问题。

示例:

use std::net;
use std::thread;
use std::sync::mpsc;
fn scan_port(host: &str, port: u16) -> bool {
    let host = host.to_string();
    let port = port;
    let (sender, receiver) = mpsc::channel();
    let t = thread::spawn(move || {
        match sender.send(net::TcpStream::connect((host.as_str(), port))) {
            Ok(()) => {}, // everything good
            Err(_) => {}, // we have been released, don't panic
        }
    });
    thread::sleep(std::time::Duration::new(5, 0));
    match receiver.try_recv() {
        Ok(Ok(handle)) => true, // we have a connection
        Ok(Err(_)) => false, // connecting failed
        Err(mpsc::TryRecvError::Empty) => {
            drop(receiver);
            drop(t);
            // connecting took more than 5 seconds
            false
        },
        Err(mpsc::TryRecvError::Disconnected) => unreachable!(),
    }
}

@ker的回答将始终等待5秒,即使连接完成得更快。这里有一种类似的方法,超时和网络请求发生在不同的线程上,第一个完成的线程获胜:

let (sender, receiver) = mpsc::channel();
let tsender = sender.clone();
let t = thread::spawn(move || {
    match sender.send(Ok(net::TcpStream::connect((host.as_str(), port)))) {
        Ok(()) => {}, // everything good
        Err(_) => {}, // we have been released, don't panic
    }
});
let timer = thread::spawn(move || {
  thread::sleep(Duration::from_millis(5000));
  match tsender.send(Err(MyTimeoutError)) {
    Ok(()) => {}, // oops, we timed out
    Err(_) => {}, // great, the request finished already
  }
});
return receiver.recv().unwrap();

但只要你这样做,你还不如用recv_timeout代替:

let (sender, receiver) = mpsc::channel();
let t = thread::spawn(move || {
    match sender.send(net::TcpStream::connect((host.as_str(), port))) {
        Ok(()) => {}, // everything good
        Err(_) => {}, // we have been released, don't panic
    }
});
return receiver.recv_timeout(Duration::from_millis(5000));

这里有一个抽象,因此您可以运行任何带有超时的函数

use std::{
    collections::HashMap,
    sync::mpsc::{self, RecvTimeoutError},
    thread,
    time::Duration,
};
#[derive(Debug)]
struct TimeoutError;
fn run_with_timeout<F, T>(f: F, timeout: Duration) -> Result<T, TimeoutError>
where
    F: FnOnce() -> T + Send + 'static,
    T: Send + 'static,
{
    let (tx, rx) = mpsc::channel();
    let _ = thread::spawn(move || {
        let result = f();
        match tx.send(result) {
            Ok(()) => {} // everything good
            Err(_) => {} // we have been released, don't panic
        }
    });
    match rx.recv_timeout(timeout) {
        Ok(result) => Ok(result),
        Err(RecvTimeoutError::Timeout) => Err(TimeoutError),
        Err(RecvTimeoutError::Disconnected) => unreachable!(),
    }
}
#[allow(dead_code)]
#[derive(Debug)]
struct Foo {
    bar: String,
    bar_vec: Vec<String>,
    bar_map: HashMap<String, String>,
}
fn return_foo() -> Foo {
    Foo {
        bar: "bar".to_string(),
        bar_vec: vec!["bar".to_string()],
        bar_map: [("bar".to_string(), "bar".to_string())]
            .iter()
            .cloned()
            .collect(),
    }
}
fn main() {
    // This will timeout
    let result = run_with_timeout(
        || {
            thread::sleep(Duration::from_secs(2));
            42
        },
        Duration::from_secs(1),
    );
    println!("Result: {:?}", result);
    // This will not timeout
    let result = run_with_timeout(
        || {
            thread::sleep(Duration::from_secs(2));
            42
        },
        Duration::from_secs(3),
    );
    println!("Result: {:?}", result);
    // This will timeout (Custom type)
    let result = run_with_timeout(
        || {
            thread::sleep(Duration::from_secs(2));
            return_foo()
        },
        Duration::from_secs(1),
    );
    println!("Result: {:?}", result);
    // This will not timeout (Custom type)
    let result = run_with_timeout(
        || {
            thread::sleep(Duration::from_secs(2));
            return_foo()
        },
        Duration::from_secs(3),
    );
    println!("Result: {:?}", result);
}

您可以使用tokio。

use std::net;
use std::time::Duration;
use tokio::time::error::Elapsed;
async fn scan_port(host: &str, port: u16) -> Result<bool, Elapsed> {
    tokio::time::timeout(Duration::from_secs(10), async {
        net::TcpStream::connect((host, port)).is_ok()
    })
    .await
}

相关内容

最新更新