如何限制async_trait函数的返回值以实现同步?



我使用async_traitcrate来定义一个trait上的async方法。我打算将Future转换为Stream,将Stream包装在Arc中,并将Arc发送到多个线程。最小可复制代码如下:

use async_trait::async_trait;
use futures::stream::{unfold, Stream};
use std::sync::Arc;
use tokio::runtime::Runtime;
#[async_trait]
trait Trait: Send + Sync {
async fn do_something(&self) -> i32;
}
async fn use_trait<T: Trait>(x: &T) {
let boxed: Arc<Box<dyn Stream<Item = i32> + Send + Sync>>;
let lazy_poller = unfold(None, move |state| async move {
if let Some(value) = state {
Some((value, Some(value)))
} else {
let value = x.do_something().await;
Some((value, Some(value)))
}
});
boxed = Arc::new(Box::new(lazy_poller));
let boxed_clone = boxed.clone();
let rt = Runtime::new().unwrap();
rt.block_on(async {
let _moved = boxed_clone;
// Do something with `_moved.next()`
});
}

然而,它编译了以下错误:

error: future cannot be shared between threads safely
--> src/main.rs:21:22
|
21 |     boxed = Arc::new(Box::new(lazy_poller));
|                      ^^^^^^^^^^^^^^^^^^^^^ future created by async block is not `Sync`
|
= help: the trait `Sync` is not implemented for `dyn futures::Future<Output = i32> + std::marker::Send`
note: future is not `Sync` as it awaits another future which is not `Sync`
--> src/main.rs:17:25
|
17 |             let value = x.do_something().await;
|                         ^^^^^^^^^^^^^^^^ await occurs here on type `Pin<Box<dyn futures::Future<Output = i32> + std::marker::Send>>`, which is not `Sync`
= note: required for the cast to the object type `dyn Stream<Item = i32> + Sync + std::marker::Send`

似乎async_trait将异步方法的返回类型糖化为Pin<Box<dyn Future<...> + Send>>,而没有指定Sync。然而,我认为除了Send之外,要求Futures为Sync也是相当普遍的。

我的问题是:

  • 我应该如何指定异步方法的返回类型为Sync,和
  • 为什么async_trait为返回类型自动指定Sync?

正如在评论中已经提到的,Futures在大多数用例中不需要是Sync;当它可能从不同的任务轮询时,FutureExt::shared是走的路,这正是我的用例所需要的。我不需要把它变成Stream

对于我的例子,它将是:

use async_trait::async_trait;
use futures::future::FutureExt;
use tokio::runtime::Runtime;
#[async_trait]
trait Trait: Send + Sync {
async fn do_something(&self) -> i32;
}
async fn use_trait<T: Trait>(x: &T) {
let shared = x.do_something().shared();
let shared_clone = shared.clone();
let rt = Runtime::new().unwrap();
rt.block_on(async {
let _moved = shared_clone;
// Do something with `_moved.await`
});
println!("{}", shared.await)
}

最新更新