使用通知器在线程之间共享状态



我是 Rust 的新手。

我正在尝试编写一个将在创建文件后启动计数器的file_sensor。计划是,一段时间后,如果未收到第二个文件,传感器将以零退出代码退出。

我可以编写代码来继续这项工作,但我觉得下面的代码说明了问题(我也错过了例如提到的 post 函数)

我已经为这个问题苦苦挣扎了几个小时,我已经尝试了 Arc 和互斥锁,甚至是全局变量。

Timer 实现是 Ticktock-rs

我需要能够在 EventKind::Create(CreateKind::Folder) 的匹配正文中获取检测信号,或者在循环中获取file_count

我在此处附加的代码运行,但循环中的file_count始终为零。

use std::env;
use std::path::Path;
use std::{thread, time};
use std::process::ExitCode;
use ticktock::Timer;
use notify::{ 
Watcher, 
RecommendedWatcher, 
RecursiveMode, 
Result, 
event::{EventKind, CreateKind, ModifyKind, Event}
};

fn main() -> Result<()> {
let now = time::Instant::now();
let mut heartbeat = Timer::apply(
|_, count| {
*count += 1;
*count
},
0,
)
.every(time::Duration::from_millis(500))
.start(now);
let mut file_count = 0;
let args = Args::parse();
let REQUEST_SENSOR_PATH = env::var("REQUEST_SENSOR_PATH").expect("$REQUEST_SENSOR_PATH} is not set");
let mut watcher = notify::recommended_watcher(move|res: Result<Event>| {
match res {
Ok(event) => {
match event.kind {
EventKind::Create(CreateKind::File) => {   
file_count += 1;
// do something with file
}
_ => { /* something else changed */ }
}
println!("{:?}", event);
},
Err(e) => {
println!("watch error: {:?}", e);
ExitCode::from(101);
},
}
})?;
watcher.watch(Path::new(&REQUEST_SENSOR_PATH), RecursiveMode::Recursive)?;
loop {
let now = time::Instant::now();
if let Some(n) = heartbeat.update(now){
println!("Heartbeat: {}, fileCount: {}", n, file_count);
if n > 10 {
heartbeat.set_value(0);
// This function will reset timer when a file arrives
}
}
}
Ok(())
}

编译器警告显示问题:

warning: unused variable: `file_count`
--> src/main.rs:31:25
|
31 |                         file_count += 1;
|                         ^^^^^^^^^^
|
= note: `#[warn(unused_variables)]` on by default
= help: did you mean to capture by reference instead?

这里的问题是您在move ||闭包中使用file_countfile_counti32,这是Copy。在move ||闭包中使用它实际上会创建它的副本,如果您分配给它,它将不再更新原始变量。

无论哪种方式,都无法从事件处理程序修改main()中的变量。如果事件处理程序引用事物,则需要'static生存期,因为 Rust 无法保证事件处理程序的生存期比main短。

此问题的一个解决方案是使用引用计数器和内部可变性。在本例中,我将Arc用于参考计数器,AtomicI32用于内部可变性。请注意,notify::recommended_watcher需要线程安全,否则我们可以使用Rc<Cell<i32>>代替Arc<AtomicI32>,这是一回事,但仅适用于单线程环境,开销略低。

use notify::{
event::{CreateKind, Event, EventKind},
RecursiveMode, Result, Watcher,
};
use std::time;
use std::{env, sync::atomic::Ordering};
use std::{path::Path, sync::Arc};
use std::{process::ExitCode, sync::atomic::AtomicI32};
use ticktock::Timer;
fn main() -> Result<()> {
let now = time::Instant::now();
let mut heartbeat = Timer::apply(
|_, count| {
*count += 1;
*count
},
0,
)
.every(time::Duration::from_millis(500))
.start(now);
let file_count = Arc::new(AtomicI32::new(0));
let REQUEST_SENSOR_PATH =
env::var("REQUEST_SENSOR_PATH").expect("$REQUEST_SENSOR_PATH} is not set");
let mut watcher = notify::recommended_watcher({
let file_count = Arc::clone(&file_count);
move |res: Result<Event>| {
match res {
Ok(event) => {
match event.kind {
EventKind::Create(CreateKind::File) => {
file_count.fetch_add(1, Ordering::AcqRel);
// do something with file
}
_ => { /* something else changed */ }
}
println!("{:?}", event);
}
Err(e) => {
println!("watch error: {:?}", e);
ExitCode::from(101);
}
}
}
})?;
watcher.watch(Path::new(&REQUEST_SENSOR_PATH), RecursiveMode::Recursive)?;
loop {
let now = time::Instant::now();
if let Some(n) = heartbeat.update(now) {
println!(
"Heartbeat: {}, fileCount: {}",
n,
file_count.load(Ordering::Acquire)
);
if n > 10 {
heartbeat.set_value(0);
// This function will reset timer when a file arrives
}
}
}
}

另外,请注意,ExitCode::from(101);会给您一个警告。它实际上并没有退出程序,它只创建一个退出代码变量,然后再次丢弃它。您可能打算写std::process::exit(101);.虽然我不鼓励它,因为它没有正确清理(不调用任何Drop实现)。我会在这里使用panic。这正是panic的确切用例。

最新更新