如何在rust中触发异步回调



我试图在Rust中实现StateMachine,但在尝试在spawn线程中触发StateMachine的回调时遇到了一些问题。

这是我的statemmachine结构体。状态是一个通用的T,因为我想在许多不同的场景中使用它,并且我使用Vec来存储注册到此statemmachine的所有回调。

在最开始的时候,我没有使用生命周期'a,但它会遇到一些生命周期问题,所以我根据以下建议添加了生命周期'a:

pub struct StateMachine<'a, T> where T:Clone+Eq+'a {
state: RwLock<T>,
listeners2: Vec<Arc<Mutex<ListenerCallback<'a, T>>>>,
}
pub type ListenerCallback<'a, T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'a ;

当状态改变时,StateMachine将触发所有回调,如下所示。

pub async fn try_set(&mut self, new_state:T) -> Result<()> {
if (block_on(self.state.read()).deref().eq(&new_state)) {
return Ok(())
}
// todo change the state
// fire every listener in spawn
let mut fire_results = vec![];
for listener in &mut self.listeners2 {
let state = new_state.clone();
let fire_listener = listener.clone();
fire_results.push(tokio::spawn(async move {
let mut guard  = fire_listener.lock().unwrap();
guard.deref_mut()(state);
}));
}
// if fire result return Err, return it
for fire_result in fire_results {
fire_result.await?;
}
Ok(())
}

但是会导致编译错误。

error[E0521]: borrowed data escapes outside of associated function
--> src/taf/taf-core/src/execution/state_machine.rs:54:33
|
15 | impl<'a,T> StateMachine<'a,T> where T:Clone+Eq+Send {
|      -- lifetime `'a` defined here
...
34 |     pub async fn try_set(&mut self, new_state:T) -> Result<()> {
|                          --------- `self` is a reference that is only valid in the associated function body
...
54 |             let fire_listener = listener.clone();
|                                 ^^^^^^^^^^^^^^^^
|                                 |
|                                 `self` escapes the associated function body here
|                                 argument requires that `'a` must outlive `'static`

##########################################################

完整的代码加上了很多业务逻辑,所以我重写了2个演示如下,问题是一样的。第一个演示同步触发回调,它工作,第二个演示尝试异步触发回调,它遇到了同样的问题:self在这里转义了相关的函数体。

第一个演示(它工作):

use std::alloc::alloc;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex, RwLock};
use anyhow::Result;
use dashmap::DashMap;
struct StateMachine<'a,T> where T:Clone+Eq+'a {
state: T,
listeners: Vec<Box<Callback<'a, T>>>,
}
type Callback<'a, T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'a;
impl<'a, T> StateMachine<'a,T> where T:Clone+Eq+'a {
pub fn new(init_state: T) -> Self {
StateMachine {
state: init_state,
listeners: vec![]
}
}
pub fn add_listener(&mut self, listener: Box<Callback<'a, T>>) -> Result<()> {
self.listeners.push(listener);
Ok(())
}
pub fn set(&mut self, new_state: T) -> Result<()> {
self.state = new_state.clone();
for listener in &mut self.listeners {
listener(new_state.clone());
}
Ok(())
}
}
#[derive(Clone, Eq, PartialEq, Hash)]
enum ExeState {
Waiting,
Running,
Finished,
Failed,
}
struct Execution<'a> {
exec_id: String,
pub state_machine: StateMachine<'a, ExeState>,
}
struct ExecManager<'a> {
all_jobs: Arc<RwLock<DashMap<String, Execution<'a>>>>,
finished_jobs: Arc<RwLock<Vec<String>>>,
}
impl<'a> ExecManager<'a> {
pub fn new() -> Self {
ExecManager {
all_jobs: Arc::new(RwLock::new(DashMap::new())),
finished_jobs: Arc::new(RwLock::new(vec![]))
}
}
fn add_job(&mut self, job_id: String) {
let mut execution = Execution {
exec_id: job_id.clone(),
state_machine: StateMachine::new(ExeState::Waiting)
};
// add listener
let callback_finished_jobs = self.finished_jobs.clone();
let callback_job_id = job_id.clone();
execution.state_machine.add_listener( Box::new(move |new_state| {
println!("listener fired!, job_id {}", callback_job_id.clone());
if new_state == ExeState::Finished || new_state == ExeState::Failed {
let mut guard = callback_finished_jobs.write().unwrap();
guard.deref_mut().push(callback_job_id.clone());
}
Ok(())
}));
let mut guard = self.all_jobs.write().unwrap();
guard.deref_mut().insert(job_id, execution);
}
fn mock_exec(&mut self, job_id: String) {
let mut guard = self.all_jobs.write().unwrap();
let mut exec = guard.deref_mut().get_mut(&job_id).unwrap();
exec.state_machine.set(ExeState::Finished);
}
}

#[test]
fn test() {
let mut manager = ExecManager::new();
manager.add_job(String::from("job_id1"));
manager.add_job(String::from("job_id2"));
manager.mock_exec(String::from("job_id1"));
manager.mock_exec(String::from("job_id2"));

}

第二个演示:

use std::alloc::alloc;
use std::ops::DerefMut;
use std::sync::{Arc, Mutex, RwLock};
use anyhow::Result;
use dashmap::DashMap;
use petgraph::algo::astar;
struct StateMachine<'a,T> where T:Clone+Eq+Send+'a {
state: T,
listeners: Vec<Arc<Mutex<Box<Callback<'a, T>>>>>,
}
type Callback<'a, T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'a;
impl<'a, T> StateMachine<'a,T> where T:Clone+Eq+Send+'a {
pub fn new(init_state: T) -> Self {
StateMachine {
state: init_state,
listeners: vec![]
}
}
pub fn add_listener(&mut self, listener: Box<Callback<'a, T>>) -> Result<()> {
self.listeners.push(Arc::new(Mutex::new(listener)));
Ok(())
}
pub fn set(&mut self, new_state: T) -> Result<()> {
self.state = new_state.clone();
for listener in &mut self.listeners {
let spawn_listener = listener.clone();
tokio::spawn(async move {
let mut guard = spawn_listener.lock().unwrap();
guard.deref_mut()(new_state.clone());
});
}
Ok(())
}
}
#[derive(Clone, Eq, PartialEq, Hash)]
enum ExeState {
Waiting,
Running,
Finished,
Failed,
}
struct Execution<'a> {
exec_id: String,
pub state_machine: StateMachine<'a, ExeState>,
}
struct ExecManager<'a> {
all_jobs: Arc<RwLock<DashMap<String, Execution<'a>>>>,
finished_jobs: Arc<RwLock<Vec<String>>>,
}
impl<'a> ExecManager<'a> {
pub fn new() -> Self {
ExecManager {
all_jobs: Arc::new(RwLock::new(DashMap::new())),
finished_jobs: Arc::new(RwLock::new(vec![]))
}
}
fn add_job(&mut self, job_id: String) {
let mut execution = Execution {
exec_id: job_id.clone(),
state_machine: StateMachine::new(ExeState::Waiting)
};
// add listener
let callback_finished_jobs = self.finished_jobs.clone();
let callback_job_id = job_id.clone();
execution.state_machine.add_listener( Box::new(move |new_state| {
println!("listener fired!, job_id {}", callback_job_id.clone());
if new_state == ExeState::Finished || new_state == ExeState::Failed {
let mut guard = callback_finished_jobs.write().unwrap();
guard.deref_mut().push(callback_job_id.clone());
}
Ok(())
}));
let mut guard = self.all_jobs.write().unwrap();
guard.deref_mut().insert(job_id, execution);
}
fn mock_exec(&mut self, job_id: String) {
let mut guard = self.all_jobs.write().unwrap();
let mut exec = guard.deref_mut().get_mut(&job_id).unwrap();
exec.state_machine.set(ExeState::Finished);
}
}

#[test]
fn test() {
let mut manager = ExecManager::new();
manager.add_job(String::from("job_id1"));
manager.add_job(String::from("job_id2"));
manager.mock_exec(String::from("job_id1"));
manager.mock_exec(String::from("job_id2"));

}

第二个演示的编译错误:

error[E0521]: borrowed data escapes outside of associated function
--> generic/src/callback2.rs:34:34
|
15 | impl<'a, T> StateMachine<'a,T> where T:Clone+Eq+Send+'a {
|      -- lifetime `'a` defined here
...
29 |     pub fn set(&mut self, new_state: T) -> Result<()> {
|                --------- `self` is a reference that is only valid in the associated function body
...
34 |             let spawn_listener = listener.clone();
|                                  ^^^^^^^^^^^^^^^^
|                                  |
|                                  `self` escapes the associated function body here
|                                  argument requires that `'a` must outlive `'static`
|
= note: requirement occurs because of the type `std::sync::Mutex<Box<dyn FnMut(T) -> Result<(), anyhow::Error> + Send + Sync>>`, which makes the generic argument `Box<dyn FnMut(T) -> Result<(), anyhow::Error> + Send + Sync>` invariant
= note: the struct `std::sync::Mutex<T>` is invariant over the parameter `T`
= help: see <https://doc.rust-lang.org/nomicon/subtyping.html> for more information about variance

tokio::spawn()生成的任务不能使用借来的数据(这里,具有生存期'a的数据,无论它可能是什么)。这是因为目前没有(也可能永远不会有)任何方法可以保证借来的数据比生成的任务更可靠。

你有两个选择:

  1. 触发通知而不生成。您可以将通知期货放入FuturesUnordered以并发地运行它们,但是它们仍然必须在try_set()完成之前全部完成。

  2. 删除寿命参数;停止允许借用数据的回调。在必要的地方把'static放在dyn类型上。更改StateMachine的用户,使他们不尝试使用借来的数据,而是使用Arc,如果必要的话。

    pub struct StateMachine<T> where T: Clone + Eq + 'static {
    state: RwLock<T>,
    listeners2: Vec<Arc<Mutex<ListenerCallback<T>>>>,
    }
    pub type ListenerCallback<T> = dyn FnMut(T) -> Result<()> + Send + Sync + 'static;
    

最新更新