使用人造丝进行并行文件处理



我在本地源文件夹中有7个CSV文件(每个55 MB),我想将其转换为JSON格式并存储到本地文件夹中。我的操作系统是MacOS(四核英特尔i5)。基本上,它是一个简单的Rust程序,作为从控制台运行

./target/release/convert <source-folder> <target-folder>

我使用Rust线程的多线程方法是严格遵循

fn main() -> Result<()> {
let source_dir = PathBuf::from(get_first_arg()?);
let target_dir = PathBuf::from(get_second_arg()?);
let paths = get_file_paths(&source_dir)?;
let mut handles = vec![];
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
handles.push(thread::spawn(move || {
let _ = convert(&source_path, &target_path);
}));
}
for h in handles {
let _ = h.join();
}
Ok(())
}

我使用time运行它来测量CPU利用率,从而得出

2.93s user 0.55s system 316% cpu 1.098 total

然后,我尝试使用rayon(线程池)机箱来实现相同的任务:

fn main() -> Result<()> {
let source_dir = PathBuf::from(get_first_arg()?);
let target_dir = PathBuf::from(get_second_arg()?);
let paths = get_file_paths(&source_dir)?;
let pool = rayon::ThreadPoolBuilder::new().num_threads(15).build()?;
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
pool.install(|| {
let _ = convert(&source_path, &target_path);
});
}
Ok(())
}

我使用time运行它来测量CPU利用率,从而得出

2.97s user 0.53s system 98% cpu 3.561 total

我用人造丝时看不出有什么改进。我可能用错人造丝了。有人知道它出了什么问题吗?

更新(4月9日)

在与Rust检查器进行了一段时间的斗争后,只想分享一个解决方案,也许它可以帮助其他人,或者任何其他人都可以建议更好的方法/解决方案

pool.scope(move |s| {
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir).unwrap();
s.spawn(move |_s| {
convert(&source_path, &target_path).unwrap();
});
}
});

但仍然无法击败对113个文件使用ruststd::thread的方法。

46.72s user 8.30s system 367% cpu 14.955 total

更新(4月10日)

在@maxy评论之后

// rayon solution
paths.into_par_iter().for_each(|source_path| {
let target_path = create_target_file_path(&source_path, &target_dir);
match target_path {
Ok(target_path) => {
info!(
"Processing {}",
target_path.to_str().unwrap_or("Unable to convert")
);
let res = convert(&source_path, &target_path);
if let Err(e) = res {
error!("{}", e);
}
}
Err(e) => error!("{}", e),
}
});
// std::thread solution
let mut handles = vec![];
for source_path in paths {
let target_path = create_target_file_path(&source_path, &target_dir)?;
handles.push(thread::spawn(move || {
let _ = convert(&source_path, &target_path);
}));
}
for handle in handles {
let _ = handle.join();
}

57个文件的比较:

std::threads: 23.71s user 4.19s system 356% cpu 7.835 total
rayon:        23.36s user 4.08s system 324% cpu 8.464 total

人造丝安装的docu不是非常清楚,但签名:

pub fn install<OP, R>(&self, op: OP) -> R where
R: Send,
OP: FnOnce() -> R + Send, 

表示它返回类型CCD_ 5。与闭包返回的类型R相同。所以显然install()必须等待结果。

只有当闭包派生出额外的任务时,例如通过在闭包中使用.par_iter(),这才有意义。我建议在文件列表上直接使用reson的并行迭代器(而不是for循环)。您甚至不需要创建自己的线程池,默认池通常是可以的。

如果您坚持手动执行,则必须使用spawn()而不是install。您可能需要将循环移动到传递给scope()的lambda中。

最新更新