与Tokio一起开创非静态的未来



我有一个异步方法,它应该并行执行一些期货,并且只有在所有期货完成后才返回。但是,它通过引用传递一些寿命不如'static的数据(它将在主方法中的某个点被丢弃(。从概念上讲,它类似于这个(游乐场(:

async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in array {
let task = spawn(do_sth(i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}
#[tokio::main]
async fn main() {
parallel_stuff(&[3, 1, 4, 2]);
}

现在,tokio希望传递给spawn的future在'static生存期内有效,因为我可以在future不停止的情况下丢弃句柄。这意味着我上面的例子产生了这个错误消息:

error[E0759]: `array` has an anonymous lifetime `'_` but it needs to satisfy a `'static` lifetime requirement
--> src/main.rs:12:25
|
12 | async fn parallel_stuff(array: &[u64]) {
|                         ^^^^^  ------ this data with an anonymous lifetime `'_`...
|                         |
|                         ...is captured here...
...
15 |         let task = spawn(do_sth(i));
|                    ----- ...and is required to live as long as `'static` here

所以我的问题是:我如何生成只对当前上下文有效的期货,然后我可以等到所有期货都完成?

不可能从async Rust生成非'static未来。这是因为任何异步函数都可能在任何时候被取消,因此无法保证调用方真的比派生任务更长寿。

的确,有各种各样的板条箱允许异步任务的作用域派生,但这些板条箱不能从异步代码中使用。所做的允许从非异步代码中派生作用域异步任务。这并不违反上面的问题,因为派生它们的非异步代码在任何时候都不能取消,因为它不是异步的。

通常有两种方法:

  1. 使用Arc而不是普通引用生成'static任务
  2. 使用未来机箱中的并发原语,而不是派生

通常,要生成静态任务并使用Arc,您必须拥有相关值的所有权。这意味着,由于您的函数引用了参数,因此在不克隆数据的情况下无法使用此技术。

async fn do_sth(with: Arc<[u64]>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &[u64]) {
// Make a clone of the data so we can shared it across tasks.
let shared: Arc<[u64]> = Arc::from(array);

let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}
}

请注意,如果您有一个对数据的可变引用,并且数据是Sized,即不是切片,则可以暂时拥有它的所有权。

async fn do_sth(with: Arc<Vec<u64>>, idx: usize) {
delay_for(Duration::new(with[idx], 0)).await;
println!("{}", with[idx]);
}
async fn parallel_stuff(array: &mut Vec<u64>) {
// Swap the array with an empty one to temporarily take ownership.
let vec = std::mem::take(array);
let shared = Arc::new(vec);

let mut tasks: Vec<JoinHandle<()>> = Vec::new();
for i in 0..array.len() {
// Cloning an Arc does not clone the data.
let shared_clone = shared.clone();
let task = spawn(do_sth(shared_clone, i));
tasks.push(task);
}
for task in tasks {
task.await;
}

// Put back the vector where we took it from.
// This works because there is only one Arc left.
*array = Arc::try_unwrap(shared).unwrap();
}

另一个选项是使用未来机箱中的并发原语。它们的优点是处理非'static数据,但缺点是任务不能同时在多个线程上运行。

对于许多工作流来说,这是非常好的,因为异步代码应该花费大部分时间等待IO。

一种方法是使用CCD_ 10。这是一个特殊的集合,可以存储许多不同的期货,它有一个next函数,可以同时运行所有期货,并在第一个期货完成后返回。(next功能仅在导入StreamExt时可用(

你可以这样使用它:

use futures::stream::{FuturesUnordered, StreamExt};
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
let mut tasks = FuturesUnordered::new();
for i in array {
let task = do_sth(i);
tasks.push(task);
}
// This loop runs everything concurrently, and waits until they have
// all finished.
while let Some(()) = tasks.next().await { }
}

注意:必须在共享值之后定义FuturesUnordered。否则,您将得到一个借用错误,这是由它们以错误的顺序丢弃引起的。


另一种方法是使用Stream。对于流,您可以使用buffer_unordered。这是一个在内部使用FuturesUnordered的实用程序。

use futures::stream::StreamExt;
async fn do_sth(with: &u64) {
delay_for(Duration::new(*with, 0)).await;
println!("{}", with);
}
async fn parallel_stuff(array: &[u64]) {
// Create a stream going through the array.
futures::stream::iter(array)
// For each item in the stream, create a future.
.map(|i| do_sth(i))
// Run at most 10 of the futures concurrently.
.buffer_unordered(10)
// Since Streams are lazy, we must use for_each or collect to run them.
// Here we use for_each and do nothing with the return value from do_sth.
.for_each(|()| async {})
.await;
}

请注意,在这两种情况下,导入StreamExt都很重要,因为它提供了在不导入扩展特性的情况下无法在流上使用的各种方法。

如果代码使用线程进行并行,可以通过使用transmate延长生存期来避免复制。一个例子:

fn main() {
let now = std::time::Instant::now();
let string = format!("{now:?}");
println!(
"{now:?} has length {}",
parallel_len(&[&string, &string]) / 2
);
}
fn parallel_len(input: &[&str]) -> usize {
// SAFETY: this variable needs to be static, because it is passed into a thread,
// but the thread does not live longer than this function, because we wait for
// it to finish by calling `join` on it.
let input: &[&'static str] = unsafe { std::mem::transmute(input) };
let mut threads = vec![];
for txt in input {
threads.push(std::thread::spawn(|| txt.len()));
}
threads.into_iter().map(|t| t.join().unwrap()).sum()
}

这似乎也适用于异步代码,这是合理的,但我对此了解不多,不能肯定。

当前存在的答案可以归结为:

有可能";"产卵";一个非静态的未来,只要它被限制在与调用方相同的线程中运行。

这让我很不满意。至少从表面上看,应该有可能完全生成一个有作用域的未来,就像生成有作用域线程一样。事实证明,tokio以结构化并发的名义探索了这个想法。不幸的是,他们无法真正使其发挥作用,主要是因为(IIUC(目前不可能以非阻塞+惯用的方式强制执行范围界定。此评论对此进行了更详细的解释。

最新更新