从 tokio 任务中排出 vec<_>

  • 本文关键字:vec tokio 任务 rust
  • 更新时间 :
  • 英文 :


我有一个类型为Vec<i32>的缓冲区,我想将其写入磁盘,并在一定时间后将其写入.drain()。我面临的问题是所有权问题。如何在不获得所有权的情况下将缓冲区的内容写入磁盘和.drain(),或者在这种情况下由于tokio任务而不可能?

use tokio::sync::mpsc;
use std::time::Duration;
use std::fs::File;
use std::io::BufWriter;
use serde_json;
#[tokio::main]
async fn main() {
let mut buffer: Vec<i32> = Vec::new();
let mut interval = tokio::time::interval(Duration::from_secs(60));
let (tx, mut rx) = mpsc::unbounded_channel();
let sender_handle = tokio::spawn(async move {
for i in 0..5 {
tx.send(i).unwrap();
}
});
let receiver_handle = tokio::spawn(async move {
while let Some(x) = rx.recv().await {
buffer.push(x);
}
});
let interval_handle = tokio::spawn(async move {
loop {
interval.tick().await;

let file: File::create("buffer.txt");
let mut writer = BufWriter::new(file.unwrap());
serde_json::to_writer(&mut writer, &buffer);
buffer.drain(..);
}
});
let (_, _, _) = tokio::join!(sender_handle, receiver_handle, interval_handle);
}

您需要添加一些同步机制,以便控制对矢量的访问。Arc<Mutex<Vec<_>>>可以做到:

use futures::lock::Mutex;
use std::sync::Arc;
use tokio::sync::mpsc;
use std::time::Duration;
use std::fs::File;
use std::io::BufWriter;
use serde_json;
#[tokio::main]
async fn main() {
let buffer: Arc<Mutex<Vec<i32>>> = Arc::new(Mutex::new(Vec::new()));
let mut interval = tokio::time::interval(Duration::from_secs(60));
let (tx, mut rx) = mpsc::unbounded_channel();
let sender_handle = tokio::spawn(async move {
for i in 0..5 {
tx.send(i).unwrap();
}
});
let receiver_buff = buffer.clone();
let receiver_handle = tokio::spawn(async move {
while let Some(x) = rx.recv().await {
let mut buff = receiver_buff.lock().await;
buff.push(x);
}
});
let interval_handle = tokio::spawn(async move {
let buffer = buffer.clone();
loop {
interval.tick().await;

let file =  File::create("buffer.txt");
let mut writer = BufWriter::new(file.unwrap());
let mut buff = buffer.lock().await;
serde_json::to_writer(&mut writer, &*buff).expect("Serialization to go well");
buff.drain(..);
}
});
let (_, _, _) = tokio::join!(sender_handle, receiver_handle, interval_handle);
}

游乐场

最新更新