Rust中多线程对非重叠非连续索引的无锁处理



我正在练习rust,并决定创建一个矩阵操作/分解项目。

基本上,我希望能够在多个线程中处理底层向量。由于我将为每个线程提供不重叠的索引(这些索引可能是连续的,也可能不是连续的),并且线程将在创建它们的函数结束之前连接,因此不需要锁/同步。

我知道有几个板条箱可以做到这一点,但我想知道是否有一种相对习惯的无板条箱的方法来实现它。

我能想到的最好的是(简化了代码):

use std::thread;
//This represents the Matrix
#[derive(Debug, Clone)]
pub struct MainStruct {
pub data: Vec<f64>,
}
//This is the bit that will be shared by the threads, 
//ideally it should have its lifetime tied to that of MainStruct
//but i have no idea how to make phantomdata work in this case
#[derive(Debug, Clone)]
pub struct SliceTest {
pub data: Vec<SubSlice>,
}
//This struct is to hide *mut f64 to allow it to be shared to other threads
#[derive(Debug, Clone)]
pub struct SubSlice {
pub data: *mut f64,
}
impl MainStruct {
pub fn slice(&mut self) -> (SliceTest, SliceTest) {
let mut out_vec_odd: Vec<SubSlice> = Vec::new();
let mut out_vec_even: Vec<SubSlice> = Vec::new();
unsafe {
let ptr = self.data.as_mut_ptr();
for i in 0..self.data.len() {
let ptr_to_push = ptr.add(i);
//Non contiguous idxs
if i % 2 == 0 {
out_vec_even.push(SubSlice{data:ptr_to_push});
} else {
out_vec_odd.push(SubSlice{data:ptr_to_push});
}
}
}
(SliceTest{data: out_vec_even}, SliceTest{data: out_vec_odd})
}
}
impl SubSlice {
pub fn set(&self, val: f64) {
unsafe {*(self.data) = val;}
}
}
unsafe impl Send for SliceTest {}
unsafe impl Send for SubSlice {}
fn main() {
let mut maindata = MainStruct {
data: vec![0.0, 1.0, 2.0, 3.0, 4.0, 5.0],
};
let (mut outvec1, mut outvec2) = maindata.slice();
let mut threads = Vec::new();
threads.push(
thread::spawn(move || {
for i in 0..outvec1.data.len() {
outvec1.data[i].set(999.9);
}
})
);
threads.push(
thread::spawn(move || {
for i in 0..outvec2.data.len() {
outvec2.data[i].set(999.9);
}
})
);
for handles in threads {
handles.join();
}
println!("maindata = {:?}", maindata.data);
}

编辑:按照下面kmdreko的建议,让代码完全按照我想要的方式工作,而没有使用不安全的代码,耶!

当然,在性能方面,复制f64片可能比创建可变引用向量更便宜,除非你的结构体是由其他结构体填充的,而不是f64

extern crate crossbeam;
use crossbeam::thread;
#[derive(Debug, Clone)]
pub struct Matrix {
data: Vec<f64>,
m: usize, //number of rows
n: usize, //number of cols
}
...
impl Matrix {
...
pub fn get_data_mut(&mut self) -> &mut Vec<f64> {
&mut self.data
}
pub fn calculate_idx(max_cols: usize, i: usize, j: usize) -> usize {
let actual_idx = j + max_cols * i;
actual_idx
}
//Get individual mutable references for contiguous indexes (rows)
pub fn get_all_row_slices(&mut self) -> Vec<Vec<&mut f64>> {
let max_cols = self.max_cols();
let max_rows = self.max_rows();
let inner_data = self.get_data_mut().chunks_mut(max_cols);
let mut out_vec: Vec<Vec<&mut f64>> = Vec::with_capacity(max_rows);
for chunk in inner_data {
let row_vec = chunk.iter_mut().collect();
out_vec.push(row_vec);
}
out_vec
}
//Get mutable references for disjoint indexes (columns)
pub fn get_all_col_slices(&mut self) -> Vec<Vec<&mut f64>> {
let max_cols = self.max_cols();
let max_rows = self.max_rows();
let inner_data = self.get_data_mut().chunks_mut(max_cols);
let mut out_vec: Vec<Vec<&mut f64>> = Vec::with_capacity(max_cols);
for _ in 0..max_cols {
out_vec.push(Vec::with_capacity(max_rows));
}
let mut inner_idx = 0;
for chunk in inner_data {
let row_vec_it = chunk.iter_mut();
for elem in row_vec_it {
out_vec[inner_idx].push(elem);
inner_idx += 1;
}
inner_idx = 0;
}
out_vec
}
...
}
fn test_multithreading() {
fn test(in_vec: Vec<&mut f64>) {
for elem in in_vec {
*elem = 33.3;
}
}
fn launch_task(mat: &mut Matrix, f: fn(Vec<&mut f64>)) {
let test_vec = mat.get_all_row_slices();
thread::scope(|s| {
for elem in test_vec.into_iter() {
s.spawn(move |_| {
println!("Spawning thread...");
f(elem);
});
}
}).unwrap();
}
let rows = 4;
let cols = 3;
//new function code omitted, returns Result<Self, MatrixError>
let mut mat = Matrix::new(rows, cols).unwrap()
launch_task(&mut mat, test);
for i in 0..rows {
for j in 0..cols {
//Requires index trait implemented for matrix
assert_eq!(mat[(i, j)], 33.3);
}
}
}

此API不可靠。由于没有将SliceTestSubSlice绑定到MainStruct的生命周期注释,因此它们可以在数据销毁后保留,如果使用则会导致use-after-free错误。

很容易使它安全;您可以使用.iter_mut()来获取对元素的不同可变引用:

pub fn slice(&mut self) -> (Vec<&mut f64>, Vec<&mut f64>) {
let mut out_vec_even = vec![];
let mut out_vec_odd = vec![];

for (i, item_ref) in self.data.iter_mut().enumerate() {
if i % 2 == 0 {
out_vec_even.push(item_ref);
} else {
out_vec_odd.push(item_ref);
}
}
(out_vec_even, out_vec_odd)
}

然而,这暴露了另一个问题:thread::spawn不能保存对局部变量的引用。创建的线程允许在创建它们的范围之外生存,所以即使您执行了.join(),也不需要这样做。这在你的原始代码中也是一个潜在的问题,只是编译器不能警告它。

没有简单的方法来解决这个问题。你需要使用一种非引用的方式来使用其他线程上的数据,但那将使用Arc,它不允许改变它的数据,所以你必须求助于Mutex,这是你试图避免的。

我建议从crossbeamcrate中找到scope,它允许你生成引用本地数据的线程。我知道你想避免使用板条箱,但在我看来这是最好的解决方案。

查看操场上的工作版本。

:

  • 如何获得多个可变引用元素在Vec?
  • 你可以为线程指定一个非静态生存期吗?

最新更新