如何在不使用互斥的情况下从任意索引的多个线程写入可变切片



我有两个从另一个方法传递的切片:

fn example<T>(a1: &[T], a2: &mut [T]) {}

我想用多个线程处理a1,然后使用完全任意的索引写入a2,这些索引只有在执行每个线程时才知道。我的算法保证索引是互斥的,因此不存在数据竞争。

借用检查器不喜欢在线程之间共享可变引用,因为它不知道我们的算法所做的保证。我还得到了一个lifetime 'static required rustc (E0621)错误。

那么如何在Rust中做到这一点呢?

的答案

  • 如何将对堆栈变量的引用传递给线程
  • 对保证不相交的大向量的任意索引的同时可变访问
  • 如何将向量中不相交的切片传递给不同的线程
  • 如何在分区数组上运行并行计算线程
  • 如何同时获得对两个数组元素的可变引用

不要回答我的问题。

第一个问题的答案解决了作用域问题,但没有解决访问任意相互不相交索引的问题。第二个问题的答案建议使用as_slice_of_cells,但由于上述原因,即任意访问,这在这里不起作用。第三个问题的答案类似地建议as_slice_of_cells,但同样,数组可以拆分为不相交部分的假设在这里无法实现。第四个问题再次询问关于对数组进行分区的问题,而我们在这里无法做到这一点。第五个问题也是如此。

范围界定问题的一个答案(https://stackoverflow.com/a/64502824/10056727)实际上,试图解决这个问题,但它不建议使用横梁,并且建议的替代方案比这里的顶级答案更不安全。

在尝试实现算法时遇到两个不同的问题:

  1. 使用std::thread::spawn无法在线程之间共享非'static引用
  2. 只有通过将切片分割成多个较小的切片并将每个分割的切片专门分配给每个线程,才能安全地在没有同步的情况下写入切片中相互不相交的索引

通过使用crossbeam::scope而不是std::thread::spawn来生成线程,可以很容易地避免第一个问题。然而,后一个问题需要一个不安全的解决方案。但是,由于您知道索引是相互不相交的,因此在实践中不存在数据争用,并且您可以使用UnsafeCell向编译器断言不存在数据竞争。要对切片执行此操作,可以使用以下实用程序:

use std::cell::UnsafeCell;
#[derive(Copy, Clone)]
pub struct UnsafeSlice<'a, T> {
slice: &'a [UnsafeCell<T>],
}
unsafe impl<'a, T: Send + Sync> Send for UnsafeSlice<'a, T> {}
unsafe impl<'a, T: Send + Sync> Sync for UnsafeSlice<'a, T> {}
impl<'a, T> UnsafeSlice<'a, T> {
pub fn new(slice: &'a mut [T]) -> Self {
let ptr = slice as *mut [T] as *const [UnsafeCell<T>];
Self {
slice: unsafe { &*ptr },
}
}

/// SAFETY: It is UB if two threads write to the same index without
/// synchronization.
pub unsafe fn write(&self, i: usize, value: T) {
let ptr = self.slice[i].get();
*ptr = value;
}
}

该实用程序允许您将独占切片&mut [T]转换为可以共享但仍用于突变的切片。当然,这意味着,如果多个线程在没有同步的情况下写入同一索引,那么写入它可能会导致数据竞争。因此,write方法是不安全的,如果违反的假设,将导致UB

UnsafeSlice实用程序在使用时仍然保证在出现自由或越界错误后不会使用。UnsafeSlice只会关闭对比赛条件的验证。

要查看构造函数中的转换是否正确,请参阅Cell::from_mutCell::as_slice_of_cells方法内部的安全注释。

如何在Rust中做到这一点?诉诸于不安全是可以的。

如果需要两个线程在没有锁的情况下对相同的数据进行变异,则必须使用unsafe。甚至存在对同一数据的两个可变引用(&mut(都是未定义的行为,因此您需要通过*mut原始指针访问数据,同时要非常小心地避免数据竞争。

但是,根据您的算法,您可以通过创建与split_at_mut不重叠的&mut引用来避免unsafe

下面是一个代码,它使用Vector或AtomicI32放入Arc中来执行您需要的操作:

use std::sync::Arc;
use std::thread;
use std::sync::atomic::AtomicI32;
use std::sync::atomic::Ordering;
// ==================================================================
// ==================================================================
const SIZE: usize = 4;
// ==================================================================
// ==================================================================
fn new_worker( id: i32, // id for the thread
a_data: Arc::<Vec<AtomicI32>> // Vec of i32 inside an arc-ptr
) -> thread::JoinHandle<()> {
// create new thread
// a_data is moved into the thread
let hand = thread::spawn(move || {
// a_data[id] <- 2 * id * a_data[id]
// use load() for readng and store() for writing
a_data[id as usize].store( 
2 * id *
a_data[id as usize].load( Ordering::Relaxed ),
Ordering::Relaxed );
});
return hand;
} // ()
// ==================================================================
// ==================================================================
fn main() {
let mut data : Vec<AtomicI32> = Vec::new(); //  vector of i32
// populate the vector
for _i in 0..SIZE {
data.push( AtomicI32::new( 1 ) );
} // for
// put the array into an arc-ptr: move to the heap, share arc-ptr
let data_arc = Arc::new(data);
// share the data through the arc-ptr
let hand1 = new_worker(1, data_arc.clone());
let hand2 = new_worker(3, data_arc.clone());
// wait for the threads to end
hand1.join().unwrap();
hand2.join().unwrap();
// print their work
println!( "2 == {:?}", data_arc[1].load( Ordering::Relaxed ) );
println!( "6 == {:?}", data_arc[3].load( Ordering::Relaxed ) );
} // ()

更新的答案,之前生成的UB(正如Chayim Friedman所指出的(。人们说编译器在处理原始指针时不会基于混叠做出优化假设。如果是这样的话,那么下面的方法不应该导致UB,但我不能100%确定:

unsafe fn example<T>(a1: *const T, a1_len: usize, a2: *const T, a2_len: usize) {
let a2 = std::slice::from_raw_parts_mut(a2 as *mut T, a2_len);
let a1 = std::slice::from_raw_parts(a1, a1_len);
...
}

相关内容

最新更新