在 Rust 中以任意顺序窃取并行工作



我正在尝试编写一个并行数据加载器,用于在 Rust 中进行深度学习。任务是编写一个迭代器,该迭代器在后台执行以下操作

  1. 从磁盘读取文件并对其应用一些计算密集型预处理,结果通常是一个数字数组(或多个)
  2. 将上一步的结果分组为大小B的批次并"整理"它们 - 这通常意味着只是连接数组 - 中等计算量
  3. 生成步骤 2 的结果。

步骤 1 可以同时受 IO 和计算限制,具体取决于网络延迟、文件大小和预处理的复杂性。它必须由许多工人并行运行。步骤 2 应该脱离主线程,但可能不需要工作线程池。第 3 步发生在主线程(暴露给 Python)上。

我在 Rust 中编写它的原因是 Python 提供了两个选项:基于multiprocessingPyTorch 附带的纯 Python 实现,它有点慢但非常灵活(任意用户定义的数据预处理和批处理)和 Tensorflow 附带C++实现,它由用户从一组预定义的原语组装而成。后者要快得多,但对于我希望做的数据处理类型来说限制太多了。我希望 Rust 能像 PyTorch 那样为我提供 Tensorflow 的速度和任意代码的灵活性。

我的问题纯粹是关于实现并行性的方法。理想的设置是步骤 1) -> 通道 -> 步骤 2) -> 通道 ->步骤 3 有 N 个工作线程。由于迭代器对象可能随时被丢弃,因此严格要求能够在Drop后终止整个方案。另一方面,可以灵活地以任意顺序加载文件:例如,如果批量大小B == 16max_n_threads == 32,则完全可以启动 32 个工作线程并生成包含碰巧首先返回的 16 个示例的第一个批处理。这可以用来提高速度。

我的天真实现通过 3 个步骤创建DataLoader

  1. 创建一个n_working: Arc<AtomicUsize>来控制活动的工作线程数,并should_shutdown: Arc<AtomicBool>发出信号关闭(调用Drop时)
  2. 创建一个负责维护池的线程。它在n_working < max_n_threads上旋转并不断生成在should_shutdown终止的工作线程,否则获取单个示例,将其发送到worker->batcher通道并递减n_working
  3. 创建一个批处理线程,该线程轮询工作器>批处理器通道,在收到对象B后将它们连接成批处理并向下发送批处理器>生成器通道
#[pyclass]
struct DataLoader {
collate_worker: Option<thread::JoinHandle<()>>,
example_worker: Option<thread::JoinHandle<()>>,
should_shut_down: Arc<AtomicBool>,
receiver: Receiver<Batch>,
length: usize,
}
impl DataLoader {
fn new(
dataset: Dataset,
batch_size: usize,
capacity: usize,
) -> Self {
let n_batches = dataset.len() / batch_size;
let max_n_threads = capacity * batch_size;
let (example_sender, collate_receiver) = bounded((batch_size - 1) * capacity);
let should_shut_down = Arc::new(AtomicBool::new(false));
let shutdown_flag = should_shut_down.clone();
let example_worker = thread::spawn(move || {
rayon::scope_fifo(|s| {
let dataset = &dataset;
let n_working = Arc::new(AtomicUsize::new(0));
let mut current_index = 0;
while current_index < n_batches * batch_size {
if n_working.load(Ordering::Relaxed) == max_n_threads {
continue;
}
if shutdown_flag.load(Ordering::Relaxed) {
break;
}
let index = current_index.clone();
let sender = example_sender.clone();
let counter = n_working.clone();
let shutdown_flag = shutdown_flag.clone();
s.spawn_fifo(move |_s| {
let example = dataset.get_example(index);
if !shutdown_flag.load(Ordering::Relaxed) {
_ = sender.send(example);
} // if we should shut down, skip sending
counter.fetch_sub(1, Ordering::Relaxed);
});
current_index += 1;
n_working.fetch_add(1, Ordering::Relaxed);
};
});
});
let (batch_sender, final_receiver) = bounded(capacity);
let shutdown_flag = should_shut_down.clone();
let collate_worker = thread::spawn(move || {
'outer: loop {
let mut batch = vec![];
for _ in 0..batch_size {
if let Ok(example) = collate_receiver.recv() {
batch.push(example);
} else {
break 'outer;
}
};
let collated = collate(batch);
if shutdown_flag.load(Ordering::Relaxed) {
break; // skip sending
}
_ = batch_sender.send(collated);
};
});

Self {
collate_worker: Some(collate_worker),
example_worker: Some(example_worker),
should_shut_down: should_shut_down,
receiver: final_receiver,
length: n_batches,
}
}
}
#[pymethods]
impl DataLoader {
fn __iter__(slf: PyRef<Self>) -> PyRef<Self> { slf }
fn __next__(&mut self) -> Option<Batch> {
self.receiver.recv().ok() 
}
fn __len__(&self) -> usize {
self.length
}
}
impl Drop for DataLoader {
fn drop(&mut self) {
self.should_shut_down.store(true, Ordering::Relaxed);
if self.collate_worker.take().unwrap().join().is_err() {
println!("Panic in collate worker");
};
if self.example_worker.take().unwrap().join().is_err() {
println!("Panic in example_worker");
};
println!("dropped the dataloader");
}
}

这种实现可以工作,并且与 PyTorch 的性能大致匹配,但没有提供显着的加速。我不知道在哪里寻找改进,但我想这将有助于以窃取工作的方式自动平衡事物,并根据 IO 的比例和计算时间灵活地生成工作线程。我还预计由于旋转池管理器和处理Drop时可能出现的极端情况而导致的性能问题。

我的问题是如何最好地解决这个问题。我通常不确定这是否应该用像rayon这样的平行板条箱、像tokio这样的异步板条箱或两者的混合来解决。我也有预感,通过正确使用他们的组合器/高阶 API,我的实现可能会简单得多。我尝试了rayon但我无法获得不会浪费地执行原始顺序返回订单并尊重Drop要求的解决方案。

好的,我想我已经为您找到了一个使用人造丝并行迭代器的解决方案。

诀窍是在人造丝迭代器中使用Results,如果设置了取消标志,则返回Err

我首先创建了一个实用程序类型来创建一个可取消的线程,您可以在其中执行 rayon 迭代器。您可以通过传入线程闭包来使用它,该线程闭包将原子取消令牌作为参数。然后你必须检查取消令牌是否true,如果是,请提前退出。

use std::sync::Arc;
use std::sync::atomic::{Ordering, AtomicBool};
use std::thread::JoinHandle;
fn collate(batch: &[Computed]) -> Batch {
batch.iter().map(|&x| i128::from(x)).sum()
}
#[derive(Debug)]
struct Cancelled;
struct CancellableThread<Output: Send + 'static> {
cancel_token: Arc<AtomicBool>,
thread: Option<JoinHandle<Result<Output, Cancelled>>>,
}
impl<Output: Send + 'static> CancellableThread<Output> {
fn new<F: FnOnce(Arc<AtomicBool>) -> Result<Output, Cancelled> + Send + 'static>(init: F) -> Self {
let cancel_token = Arc::new(AtomicBool::new(false));
let thread_cancel_token = Arc::clone(&cancel_token);

CancellableThread {
thread: Some(std::thread::spawn(move || init(thread_cancel_token))),
cancel_token,
}
}

fn output(mut self) -> Output {
self.thread.take().unwrap().join().unwrap().unwrap()
}
}
impl<Output: Send + 'static> Drop for CancellableThread<Output> {
fn drop(&mut self) {
self.cancel_token.store(true, Ordering::Relaxed);

if let Some(thread) = self.thread.take() {
let _ = thread.join().unwrap();
}
}
}

我发现创建一个返回Result<(), Cancelled>的闭包很有用,这样我就可以使用 try 运算符 (?) 提前退出。

CancellableThread::new(move |cancel_token| {
let cancelled = || if cancel_token.load(Ordering::Relaxed) {
Err(Cancelled)
} else {
Ok(())
};
loop {
// was the thread dropped?
// if so, stop what we're doing
cancelled?;
// do stuff and 
// eventually return a result
}
});

然后我在DataLoader中使用了该CancellableThread抽象。无需为其创建特殊的Dropimpl,因为默认情况下,无论如何,它都会在每个字段上调用drop,这将处理取消。

type Data = Vec<u8>;
type Dataset = Vec<Data>;
type Computed = u64;
type Batch = i128;
use rayon::prelude::*;
use crossbeam::channel::{unbounded, Receiver};
struct DataLoader {
example_worker: CancellableThread<()>,
collate_worker: CancellableThread<()>,
receiver: Receiver<Batch>,
length: usize,
}

我使用了unbounded频道,因为这少了一件需要烦恼的事情。切换到bounded应该不难。

impl DataLoader {
fn new(dataset: Dataset, batch_size: usize) -> Self {
let (example_sender, collate_receiver) = unbounded();
let (batch_sender, final_receiver) = unbounded();

我不确定您是否总是可以保证数据集中的项目数是batch_size的倍数,所以我决定明确处理这个问题。

let length = if dataset.len() % batch_size == 0 {
dataset.len() / batch_size
} else {
dataset.len() / batch_size + 1
};

我首先创建了整理工作线程,尽管这可能不是必需的。如您所见,我必须复制一点来处理部分批次。

let collate_worker = CancellableThread::new(move |cancel_token| {
let cancelled = || if cancel_token.load(Ordering::Relaxed) {
Err(Cancelled)
} else {
Ok(())
};

'outer: loop {
let mut batch = Vec::with_capacity(batch_size);
for _ in 0..batch_size {
cancelled()?;

if let Ok(data) = collate_receiver.recv() {
batch.push(data);
} else {
if !batch.is_empty() {
// handle the last batch, if there
// weren't enough items to fill it
let collated = collate(&batch);
cancelled()?;
batch_sender.send(collated).unwrap();
}

break 'outer;
}
}

let collated = collate(&batch);
cancelled()?;
batch_sender.send(collated).unwrap();
}

Ok(())
});

示例 worker 是真正让事情变得简单得多的地方,因为我们可以使用 rayon 并行迭代器。如您所见,我们会在每次繁重计算之前检查取消。

let example_worker = CancellableThread::new(move |cancel_token| {
let cancelled = || if cancel_token.load(Ordering::Relaxed) {
Err(Cancelled)
} else {
Ok(())
};

let heavy_compute = |data: Data| -> Result<Computed, Cancelled> {
cancelled()?;

Ok(data.iter().map(|&x| u64::from(x)).product())
};

dataset
.into_par_iter()
.map(heavy_compute)
.try_for_each(|computed| {
example_sender.send(computed?).unwrap();

Ok(())
})
});

然后我们只是构建DataLoader.你可以看到 Python impl 是相同的:

DataLoader {
example_worker,
collate_worker,
receiver: final_receiver,
length,
}
}
}
// #[pymethods]
impl DataLoader {
fn __iter__(this: Self /* PyRef<Self> */) -> Self /* PyRef<Self> */ { this }

fn __next__(&mut self) -> Option<Batch> {
self.receiver.recv().ok() 
}

fn __len__(&self) -> usize {
self.length
}
}

操场

最新更新