我在OSX上有一个Rust应用程序,启动了大量的线程,如下面的代码所示,然而,在查看了我的OSX版本允许通过sysctl kern.num_taskthreads
命令创建的最大线程数之后,我可以看到它是kern.num_taskthreads: 2048
,这解释了为什么我不能超过2048个线程。
我怎样才能越过这个硬性限制?
let threads = 300000;
let requests = 1;
for _x in 0..threads {
println!("{}", _x);
let request_clone = request.clone();
let handle = thread::spawn(move || {
for _y in 0..requests {
request_clone.lock().unwrap().push((request::Request::new(request::Request::create_request())));
}
});
child_threads.push(handle);
}
在开始之前,我建议您阅读C10K问题。当你达到这个规模时,你需要记住更多的事情。
话虽如此,我还是建议你看看……
一个轻量级的Rust IO库,专注于在操作系统抽象上添加尽可能少的开销。
具体来说,mio提供了一个事件循环,它允许你处理大量的连接而不产生线程。不幸的是,我不知道目前有哪个HTTP库支持mio。你可以创建一个,成为Rust社区的英雄!
不确定这有多大帮助,但我试图创建一个小线程池,它将创建连接,然后通过通道将它们发送到事件循环进行读取。
我确信这段代码可能相当糟糕,但这里无论如何都是作为示例。它使用Hyper库,就像你提到的。
extern crate hyper;
use std::io::Read;
use std::thread;
use std::thread::{JoinHandle};
use std::sync::{Arc, Mutex};
use std::sync::mpsc::channel;
use hyper::Client;
use hyper::client::Response;
use hyper::header::Connection;
const TARGET: i32 = 100;
const THREADS: i32 = 10;
struct ResponseWithString {
index: i32,
response: Response,
data: Vec<u8>,
complete: bool
}
fn main() {
// Create a client.
let url: &'static str = "http://www.gooogle.com/";
let mut threads = Vec::<JoinHandle<()>>::with_capacity((TARGET * 2) as usize);
let conn_count = Arc::new(Mutex::new(0));
let (tx, rx) = channel::<ResponseWithString>();
for _ in 0..THREADS {
// Move var references into thread context
let conn_count = conn_count.clone();
let tx = tx.clone();
let t = thread::spawn(move || {
loop {
let idx: i32;
{
// Lock, increment, and release
let mut count = conn_count.lock().unwrap();
*count += 1;
idx = *count;
}
if idx > TARGET {
break;
}
let mut client = Client::new();
// Creating an outgoing request.
println!("Creating connection {}...", idx);
let res = client.get(url) // Get URL...
.header(Connection::close()) // Set headers...
.send().unwrap(); // Fire!
println!("Pushing response {}...", idx);
tx.send(ResponseWithString {
index: idx,
response: res,
data: Vec::<u8>::with_capacity(1024),
complete: false
}).unwrap();
}
});
threads.push(t);
}
let mut responses = Vec::<ResponseWithString>::with_capacity(TARGET as usize);
let mut buf: [u8; 1024] = [0; 1024];
let mut completed_count = 0;
loop {
if completed_count >= TARGET {
break; // No more work!
}
match rx.try_recv() {
Ok(r) => {
println!("Incoming response! {}", r.index);
responses.push(r)
},
_ => { }
}
for r in &mut responses {
if r.complete {
continue;
}
// Read the Response.
let res = &mut r.response;
let data = &mut r.data;
let idx = &r.index;
match res.read(&mut buf) {
Ok(i) => {
if i == 0 {
println!("No more data! {}", idx);
r.complete = true;
completed_count += 1;
}
else {
println!("Got data! {} => {}", idx, i);
for x in 0..i {
data.push(buf[x]);
}
}
}
Err(e) => {
panic!("Oh no! {} {}", idx, e);
}
}
}
}
}