在没有互斥锁的情况下,在长时间和短期运行的异步流之间进行通信?



我有两个流。slow_streamfast_stream.我正在尝试将fast_stream写入由slow_stream结果命名的存储桶中的 s3 .

理想情况下,我会做这样的事情,

while let Some(slow) = slow_stream.next().await {
while let Some(fast) = fast_stream.next().await {
tokio::spawn(async_put_in_s3_bucket(fast,slow));
}
}

如果fast_stream无限期运行,控制流是否返回到此处的外部循环?这是处理这个问题的正确方法吗?它比使用两个tokio::spawn和一个互斥体来通信b/t更快吗?有没有更生锈的方法来实现这一点?似乎有一种方法可以使用编解码器将快速流直接转换为 ByteStream,但我仍然缺少有关如何将其放入 s3 的信息以及慢速信息的内容。

您是否尝试过在两个流上使用tokio::select!,并让外部循环处理存储桶的命名?你的代码的目的有点不清楚,但我可以尝试提供一些伪代码。

let mut bucket = Default::default();
loop {
tokio::select! {
slow = slow_stream.next() => {
bucket = slow;
}
fast = fast_stream.next() => {
// You can add this to an UnorderedFutures and await it in the select.
tokio::spawn(async_put_in_s3_bucket(fast, bucket));
}
}
}

如果你能更清楚地说明,这将使我能够提出一个更好的解决方案。

最新更新