如何在分区数组上运行并行计算线程



我试图在线程之间分发一个数组,并让线程并行地汇总数组的部分。我希望线程0求和元素0 1 2,线程1求和元素3 4 5。线程2求和6和7。线程3求和8和9。

我是Rust的新手,但以前用C/C++/Java进行过编码。我真的把所有的东西和垃圾桶都扔到了这个项目上,我希望我能得到一些指导。

很抱歉我的代码太草率了,但我会在它完成后把它清理干净。请忽略所有命名不正确的变量/不一致的间距等。

use std::io;
use std::rand;
use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread::Thread;
static NTHREADS: usize = 4;
static NPROCS: usize = 10;
fn main() {
    let mut a = [0; 10]; // a: [i32; 10]
    let mut endpoint = a.len() / NTHREADS;
    let mut remElements = a.len() % NTHREADS;
    for x in 0..a.len() {
        let secret_number = (rand::random::<i32>() % 100) + 1;
        a[x] = secret_number;
        println!("{}", a[x]);
    }
    let mut b = a;
    let mut x = 0;
    check_sum(&mut a);
    // serial_sum(&mut b);
    // Channels have two endpoints: the `Sender<T>` and the `Receiver<T>`,
    // where `T` is the type of the message to be transferred
    // (type annotation is superfluous)
    let (tx, rx): (Sender<i32>, Receiver<i32>) = mpsc::channel();
    let mut scale: usize = 0;
    for id in 0..NTHREADS {
        // The sender endpoint can be copied
        let thread_tx = tx.clone();
        // Each thread will send its id via the channel
        Thread::spawn(move || {
            // The thread takes ownership over `thread_tx`
            // Each thread queues a message in the channel
            let numTougherThreads: usize = NPROCS % NTHREADS;
            let numTasksPerThread: usize = NPROCS / NTHREADS;
            let mut lsum = 0;
            if id < numTougherThreads {
                let mut q = numTasksPerThread+1;
                lsum = 0;
                while q > 0 {
                    lsum = lsum + a[scale];
                    scale+=1;
                    q = q-1;
                }
                println!("Less than numToughThreads lsum: {}", lsum);
            }
            if id >= numTougherThreads {
                let mut z = numTasksPerThread;
                lsum = 0;
                while z > 0 {
                    lsum = lsum + a[scale];
                    scale +=1;
                    z = z-1;
                }    
                println!("Greater than numToughthreads lsum: {}", lsum);
            }
            // Sending is a non-blocking operation, the thread will continue
            // immediately after sending its message
            println!("thread {} finished", id);
            thread_tx.send(lsum).unwrap();
        });
    }
    // Here, all the messages are collected
    let mut globalSum = 0;
    let mut ids = Vec::with_capacity(NTHREADS);
    for _ in 0..NTHREADS {
        // The `recv` method picks a message from the channel
        // `recv` will block the current thread if there no messages      available
        ids.push(rx.recv());
    }
    println!("Global Sum: {}", globalSum);
    // Show the order in which the messages were sent
    println!("ids: {:?}", ids);
}
fn check_sum (arr: &mut [i32]) {
    let mut sum = 0;
    let mut i = 0;
    let mut size = arr.len();
    loop {
        sum += arr[i];
        i+=1;
        if i == size { break; }
    }
    println!("CheckSum is {}", sum);
}

到目前为止,我已经做了这么多。无法理解为什么线程0和1的总和相同,而线程2和线程3的总和相同:

 -5
 -49
 -32
 99
 45
 -65
 -64
 -29
 -56
 65
 CheckSum is -91
 Greater than numTough lsum: -54
 thread 2 finished
 Less than numTough lsum: -86
 thread 1 finished
 Less than numTough lsum: -86
 thread 0 finished
 Greater than numTough lsum: -54
 thread 3 finished
 Global Sum: 0
 ids: [Ok(-86), Ok(-86), Ok(-54), Ok(-54)]

我用下面的代码把它改写成偶数。

    while q > 0 {
        if id*s+scale == a.len() { break; }
        lsum = lsum + a[id*s+scale];
        scale +=1;
        q = q-1;
    }
    println!("Less than numToughThreads lsum: {}", lsum);
}
if id >= numTougherThreads {
    let mut z = numTasksPerThread;
    lsum = 0;
    let mut scale = 0;
    while z > 0 {
        if id*numTasksPerThread+scale == a.len() { break; }
        lsum = lsum + a[id*numTasksPerThread+scale];
        scale = scale + 1;
        z = z-1;
    }

欢迎使用Rust!:)

是的,一开始我没有意识到每个线程都有自己的规模副本

不仅如此!它还获得了自己的a副本!

您正在尝试执行的操作可能类似于以下代码。我想你更容易看到一个完整的工作示例,因为你似乎是一个Rust初学者,并要求指导。我故意用Vec替换[i32; 10],因为Vec而不是隐式的Copy。它需要一个明确的clone();我们不能偶然复制它。请注意所有较大和较小的差异。该代码的功能也有所增强(减少了mut)。我评论了大多数值得注意的事情:

extern crate rand;
use std::sync::Arc;
use std::sync::mpsc;
use std::thread;
const NTHREADS: usize = 4; // I replaced `static` by `const`
// gets used for *all* the summing :)
fn sum<I: Iterator<Item=i32>>(iter: I) -> i32 {
    let mut s = 0;
    for x in iter {
        s += x;
    }
    s
}
fn main() {
    // We don't want to clone the whole vector into every closure.
    // So we wrap it in an `Arc`. This allows sharing it.
    // I also got rid of `mut` here by moving the computations into
    // the initialization.
    let a: Arc<Vec<_>> =
        Arc::new(
            (0..10)
                .map(|_| {
                    (rand::random::<i32>() % 100) + 1
                })
                .collect()
        );
    let (tx, rx) = mpsc::channel(); // types will be inferred
    { // local scope, we don't need the following variables outside
        let num_tasks_per_thread = a.len() / NTHREADS; // same here
        let num_tougher_threads = a.len() % NTHREADS; // same here
        let mut offset = 0;
        for id in 0..NTHREADS {
            let chunksize =
                if id < num_tougher_threads {
                    num_tasks_per_thread + 1
                } else {
                    num_tasks_per_thread
                };
            let my_a = a.clone();  // refers to the *same* `Vec`
            let my_tx = tx.clone();
            thread::spawn(move || {
                let end = offset + chunksize;
                let partial_sum =
                    sum( (&my_a[offset..end]).iter().cloned() );
                my_tx.send(partial_sum).unwrap();
            });
            offset += chunksize;
        }
    }
    // We can close this Sender
    drop(tx);
    // Iterator magic! Yay! global_sum does not need to be mutable
    let global_sum = sum(rx.iter());
    println!("global sum via threads    : {}", global_sum);
    println!("global sum single-threaded: {}", sum(a.iter().cloned()));
}

使用类似板条箱的横梁,您可以编写以下代码:

use crossbeam; // 0.7.3
use rand::distributions::{Distribution, Uniform}; // 0.7.3
const NTHREADS: usize = 4;
fn random_vec(length: usize) -> Vec<i32> {
    let step = Uniform::new_inclusive(1, 100);
    let mut rng = rand::thread_rng();
    step.sample_iter(&mut rng).take(length).collect()
}
fn main() {
    let numbers = random_vec(10);
    let num_tasks_per_thread = numbers.len() / NTHREADS;
    crossbeam::scope(|scope| {
        // The `collect` is important to eagerly start the threads!
        let threads: Vec<_> = numbers
            .chunks(num_tasks_per_thread)
            .map(|chunk| scope.spawn(move |_| chunk.iter().cloned().sum::<i32>()))
            .collect();
        let thread_sum: i32 = threads.into_iter().map(|t| t.join().unwrap()).sum();
        let no_thread_sum: i32 = numbers.iter().cloned().sum();
        println!("global sum via threads    : {}", thread_sum);
        println!("global sum single-threaded: {}", no_thread_sum);
    })
    .unwrap();
}

作用域线程允许您传入一个保证寿命超过线程的引用。然后,您可以直接使用线程的返回值,跳过通道(这很好,只是这里不需要!)。

我遵循了如何在Rust中生成一个范围内的随机数?以生成随机数。我还把它改成了范围[1100],因为我认为这就是你的意思。然而,您的原始代码实际上是[-98100],您也可以这样做

Iterator::sum用于对数字的迭代器求和。

我加入了一些线程工作的粗略性能数字,忽略了向量构造,处理了100000000个数字,使用Rust 1.34,并在发布模式下编译:

| threads | time (ns) | relative time (%) |
|---------+-----------+-------------------|
|       1 |  33824667 |            100.00 |
|       2 |  16246549 |             48.03 |
|       3 |  16709280 |             49.40 |
|       4 |  14263326 |             42.17 |
|       5 |  14977901 |             44.28 |
|       6 |  12974001 |             38.36 |
|       7 |  13321743 |             39.38 |
|       8 |  13370793 |             39.53 |

另请参阅:

  • 如何将对堆栈变量的引用传递给线程

所有任务都会获得scale变量的副本。线程1和2都做相同的事情,因为每个线程都有值为0scale,并以与另一个线程相同的方式修改它。线程3和线程4也是如此。

锈蚀可防止破坏螺纹安全。如果scale由线程共享,那么在访问变量时将存在竞争条件。

请阅读闭包,它们解释了变量复制部分,以及线程,它解释了何时以及如何在线程之间共享变量。

最新更新