多线程列表迭代,同时使用Mutex来防止同时处理同一类型



我正在编写一个需要同时在多个线程上运行的应用程序。它将处理一长串项目,其中每个项目的一个属性是user_id。我正在努力确保属于同一user_id的项目永远不会在同一时间处理。这意味着运行子线程的闭包需要等待,直到没有其他线程为同一用户处理数据为止。

我不知道如何解决这个问题。我的简化的当前示例如下:

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use threadpool::ThreadPool;
fn main() {
let pool = ThreadPool::new(num_cpus::get());
let mut locks: HashMap<String, Mutex<bool>> = HashMap::new();
let queue = Arc::new(vec![
"1".to_string(),
"1".to_string(),
"2".to_string(),
"1".to_string(),
"3".to_string(),
]);
let count = queue.len();
for i in 0..count {
let user_id = queue[i].clone();
// Problem: cannot borrow `locks` as mutable more than once at a time
// mutable borrow starts here in previous iteration of loop
let lock = locks.entry(user_id).or_insert(Mutex::new(true));
pool.execute(move || {
// Wait until the user_id becomes free.
lock.lock().unwrap();
// Do stuff with user_id, but never process
// the same user_id more than once at the same time.
println!("{:?}", user_id);
});
}
pool.join();
}

我试图保留一个Mutex的列表,然后用它来等待user_id空闲,但借用检查器不允许这样做。队列项目和项目流程代码在我正在处理的实际应用程序中要复杂得多

我不允许更改队列中项目的顺序(但由于等待锁定,允许进行一些更改(。

如何解决这种情况?

首先,HashMap::entry()会消耗密钥,因此由于您也想在闭包中使用它,因此需要克隆它,即.entry(user_id.clone())

由于需要在主线程和工作线程之间共享Mutex<bool>,因此同样需要将其封装在Arc中。您也可以使用Entry::or_insert_with(),这样就可以避免在不必要的情况下创建新的Mutex

let mut locks: HashMap<String, Arc<Mutex<bool>>> = HashMap::new();
// ...
let lock = locks
.entry(user_id.clone())
.or_insert_with(|| Arc::new(Mutex::new(true)))
.clone();

最后,您必须存储lock()返回的防护,否则它将立即释放。

let _guard = lock.lock().unwrap();

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use threadpool::ThreadPool;
fn main() {
let pool = ThreadPool::new(num_cpus::get());
let mut locks: HashMap<String, Arc<Mutex<bool>>> = HashMap::new();
let queue = Arc::new(vec![
"1".to_string(),
"1".to_string(),
"2".to_string(),
"1".to_string(),
"3".to_string(),
]);
let count = queue.len();
for i in 0..count {
let user_id = queue[i].clone();
let lock = locks
.entry(user_id.clone())
.or_insert_with(|| Arc::new(Mutex::new(true)))
.clone();
pool.execute(move || {
// Wait until the user_id becomes free.
let _guard = lock.lock().unwrap();
// Do stuff with user_id, but never process
// the same user_id more than once at the same time.
println!("{:?}", user_id);
});
}
pool.join();
}

最新更新