Rust缓存异步特征



当我试图缓存一个有效的值并在它无效时更新它时,我遇到了一个问题。我相信这个问题是由于我试图在异步执行之间共享状态。此外,该组件生活在一个多线程/并发环境中。

我看到的不知道如何修复的错误是

future is not `Send` as this value is used across an await

以下是我可以想出的一个最低限度的例子(它也有一些所有权问题(,通常可以捕捉到我的用例和我看到的问题。这是一个代码的游乐场。

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use std::sync::{Arc, Mutex};
struct Creds {
expires_at: DateTime<Utc>,
}
impl Creds {
fn is_expired(&self) -> bool {
self.expires_at.le(&Utc::now())
}
}
#[async_trait]
trait CredsProvider {
async fn get_creds(&self) -> Creds;
}
struct MyCredsProvider {
cached_creds: Arc<Mutex<Option<Creds>>>,
}
impl MyCredsProvider {
fn new() -> Self {
MyCredsProvider {
cached_creds: Arc::new(Mutex::new(None)),
}
}
async fn inner_get_creds(&self) -> Creds {
todo!()
}
}
#[async_trait]
impl CredsProvider for MyCredsProvider {
async fn get_creds(&self) -> Creds {
let mg = self
.cached_creds
.lock()
.expect("Unable to get lock on creds mutex");
if mg.is_some() && !mg.as_ref().unwrap().is_expired() {
return mg.unwrap();
}
let new_creds = self.inner_get_creds().await;
*mg = Some(new_creds);
return new_creds;
}
}
#[tokio::main]
async fn main() {
MyCredsProvider::new();
// Some multi-threaded / concurrent logic to periodically refresh creds
todo!()
}

我不知道如何在示例中包括这一点,但在main中,想象多个并发/并行运行的工作线程,每个线程都调用CredsProvider.get_creds,然后使用这些cred来执行一些工作(如果你能将其添加到一个完整的工作示例中,那将非常感谢我的启发(。假设MyCredsProvider.inner_get_creds代价高昂,并且应该仅在缓存的cred过期时调用。

我该如何解决此问题?我原以为Arc<Mutex<>>就足够了,但似乎还不够。有一次,我试着制作Creds和trait,这样我就可以拥有Arc<Mutex<Option<Box<dyn Creds + Send + Sync>>>>,但这感觉是错误的,没有奏效。

谢谢。

您可能希望切换到tokio::sync::Mutex(操场(。

它解决了

future is not `Send` as this value is used across an await

代码:

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use std::sync::Arc;
use tokio::sync::Mutex;
#[derive(Clone)]
struct Creds {
expires_at: DateTime<Utc>,
}
impl Creds {
fn is_expired(&self) -> bool {
self.expires_at.le(&Utc::now())
}
}
#[async_trait]
trait CredsProvider {
async fn get_creds(&self) -> Creds;
}
struct MyCredsProvider {
cached_creds: Arc<Mutex<Option<Creds>>>,
}
impl MyCredsProvider {
fn new() -> Self {
MyCredsProvider {
cached_creds: Arc::new(Mutex::new(None)),
}
}
async fn inner_get_creds(&self) -> Creds {
todo!()
}
}
#[async_trait]
impl CredsProvider for MyCredsProvider {
async fn get_creds(&self) -> Creds {
let mut mg = self
.cached_creds
.lock()
.await;
if mg.is_some() && !mg.as_ref().unwrap().is_expired() {
return mg.clone().unwrap();
} else {
let new_creds = self.inner_get_creds().await;
*mg = Some(new_creds.clone());
return new_creds;
}
}
}
#[tokio::main]
async fn main() {
MyCredsProvider::new();
// Some multi-threaded / concurrent logic to periodically refresh creds
todo!()
}

最新更新