在迭代器内部调用async函数



我想在迭代器中使用的闭包内awaitasync函数。需要闭包的函数在结构体实现中调用。我不知道该怎么做。

这段代码模拟了我要做的事情:

struct MyType {}
impl MyType {
async fn foo(&self) {
println!("foo");
(0..2).for_each(|v| {
self.bar(v).await;
});
}
async fn bar(&self, v: usize) {
println!("bar: {}", v);
}
}
#[tokio::main]
async fn main() {
let mt = MyType {};
mt.foo().await;
}

显然,这将不起作用,因为关闭不是async,给我:

error[E0728]: `await` is only allowed inside `async` functions and blocks
--> src/main.rs:8:13
|
7 |         (0..2).for_each(|v| {
|                         --- this is not `async`
8 |             self.bar(v).await;
|             ^^^^^^^^^^^^^^^^^ only allowed inside `async` functions and blocks

在寻找如何从非async函数调用async函数的答案后,我需要这个:

tokio::spawn(async move {
self.bar(v).await;
});

但现在我遇到了终身问题:

error[E0759]: `self` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:4:18
|
4 |     async fn foo(&self) {
|                  ^^^^^
|                  |
|                  this data with an anonymous lifetime `'_`...
|                  ...is captured here...
...
8 |             tokio::spawn(async move {
|             ------------ ...and is required to live as long as `'static` here

这也不让我感到惊讶,因为据我所知,Rust编译器无法知道线程将存活多长时间。考虑到这一点,用tokio::spawn衍生的线程可能比MyType类型活得更久。

我想出的第一个修复是使bar成为一个关联函数,复制我在闭包中需要的一切,并将其作为值传递给bar,并使用MyType::bar(copies_from_self)调用它,但这变得越来越难看,因为有很多复制。这也感觉像是不知道生命周期是如何工作的一种变通方法。

我试图使用futures::executor::block_on,它适用于简单的任务,如这篇文章中的一个:

(0..2).for_each(|v| {
futures::executor::block_on(self.bar(v));
});

但是,当把它放在我的现实生活中的例子,我使用第三方库1,也使用tokio,事情不再工作。在阅读文档后,我意识到#[tokio::main]是一个宏,最终在block_on中包装一切,因此这样做将有嵌套的block_on。这可能是bar中调用的async方法之一停止工作而没有任何错误或日志记录的原因(没有block_on也可以工作,因此代码不应该有任何问题)。我联系了作者,他们说我可以使用for_each(|i| async move { ... }),这让我更加困惑。

(0..2).for_each(|v| async move {
self.bar(v).await;
});

将导致编译错误

expected `()`, found opaque type`

我认为这是有意义的,因为我现在返回一个未来,而不是()。对此,我天真的做法是尝试这样等待未来:

(0..2).for_each(|v| {
async move {
self.bar(v).await;
}
.await
});

但是这让我回到了原点,导致了以下编译错误,我也认为这是有意义的,因为我现在又回到了在闭包中使用await,即sync

only allowed inside `async` functions and blocks` since the 

这一发现也使我很难利用这里和这里找到的答案。

在所有这些货物崇拜编程之后的问题基本上是,是可能的吗?如果是,我如何从闭包调用async函数?(最好不生成线程,以避免生命周期问题)在迭代器中?如果这是不可能的,那么它的惯用实现是什么样子的呢?

1这是使用的库/方法

Iterator::for_each期望一个同步闭包,因此你不能在其中使用.await(至少不能直接使用),也不能从它返回一个future。

一个解决方案是使用for循环代替.for_each:

for v in 0..2 {
self.bar(v).await;
}
更通用的方法是使用流而不是迭代器,因为它们是异步等效的(流上的等效方法通常也是异步的)。这不仅适用于for_each,也适用于大多数其他迭代器方法:
use futures::prelude::*;
futures::stream::iter(0..2)
.for_each(|c| async move {
self.bar(v).await;
})
.await;

最新更新