OSX上的应用程序不能生成超过2048个线程



我在OSX上有一个Rust应用程序,启动了大量的线程,如下面的代码所示,然而,在查看了我的OSX版本允许通过sysctl kern.num_taskthreads命令创建的最大线程数之后,我可以看到它是kern.num_taskthreads: 2048,这解释了为什么我不能超过2048个线程。

我怎样才能越过这个硬性限制?

let threads = 300000;
let requests = 1;
for _x in 0..threads {
    println!("{}", _x);
    let request_clone = request.clone();
    let handle = thread::spawn(move || {
        for _y in 0..requests {
            request_clone.lock().unwrap().push((request::Request::new(request::Request::create_request())));
        }
    });
    child_threads.push(handle);
}

在开始之前,我建议您阅读C10K问题。当你达到这个规模时,你需要记住更多的事情。

话虽如此,我还是建议你看看……

一个轻量级的Rust IO库,专注于在操作系统抽象上添加尽可能少的开销。

具体来说,mio提供了一个事件循环,它允许你处理大量的连接而不产生线程。不幸的是,我不知道目前有哪个HTTP库支持mio。你可以创建一个,成为Rust社区的英雄!

不确定这有多大帮助,但我试图创建一个小线程池,它将创建连接,然后通过通道将它们发送到事件循环进行读取。

我确信这段代码可能相当糟糕,但这里无论如何都是作为示例。它使用Hyper库,就像你提到的。

extern crate hyper;
use std::io::Read;
use std::thread;
use std::thread::{JoinHandle};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::channel;
use hyper::Client;
use hyper::client::Response;
use hyper::header::Connection;
const TARGET: i32 = 100;
const THREADS: i32 = 10;
struct ResponseWithString {
    index: i32,
    response: Response,
    data: Vec<u8>,
    complete: bool
}
fn main() {
    // Create a client.
    let url: &'static str = "http://www.gooogle.com/";
    let mut threads = Vec::<JoinHandle<()>>::with_capacity((TARGET * 2) as usize);
    let conn_count = Arc::new(Mutex::new(0));
    let (tx, rx) = channel::<ResponseWithString>();
    for _ in 0..THREADS {
        // Move var references into thread context
        let conn_count = conn_count.clone();
        let tx = tx.clone();
        let t = thread::spawn(move || {
            loop {
                let idx: i32;
                {
                    // Lock, increment, and release
                    let mut count = conn_count.lock().unwrap();
                    *count += 1;
                    idx = *count;
                }
                if idx > TARGET {
                    break;
                }
                let mut client = Client::new();
                // Creating an outgoing request.
                println!("Creating connection {}...", idx);
                let res = client.get(url)                       // Get URL...
                                .header(Connection::close())    // Set headers...
                                .send().unwrap();               // Fire!
                println!("Pushing response {}...", idx);
                tx.send(ResponseWithString {
                    index: idx,
                    response: res,
                    data: Vec::<u8>::with_capacity(1024),
                    complete: false
                }).unwrap();
            }
        });
        threads.push(t);
    }
    let mut responses = Vec::<ResponseWithString>::with_capacity(TARGET as usize);
    let mut buf: [u8; 1024] = [0; 1024];
    let mut completed_count = 0;
    loop {
        if completed_count >= TARGET {
            break; // No more work!
        }
        match rx.try_recv() {
            Ok(r) => {
                println!("Incoming response! {}", r.index);
                responses.push(r)
            },
            _ => { }
        }
        for r in &mut responses {
            if r.complete {
                continue;
            }
            // Read the Response.
            let res = &mut r.response;
            let data = &mut r.data;
            let idx = &r.index;
            match res.read(&mut buf) {
                Ok(i) => {
                        if i == 0 {
                            println!("No more data! {}", idx);
                            r.complete = true;
                            completed_count += 1;
                        }
                        else {
                            println!("Got data! {} => {}", idx, i);
                            for x in 0..i {
                                data.push(buf[x]);
                            }
                        }
                    }
                Err(e) => {
                    panic!("Oh no! {} {}", idx, e);
                }
            }
        }
    }
}

相关内容

  • 没有找到相关文章

最新更新