在activx web服务器和异步关闭之间共享状态



我想周期性地获取数据(使用异步reqwest(,然后在http端点使用actix-web作为服务器。(我有一个固定格式的数据源,我想让需要不同格式的服务读取,所以我需要转换数据。(我曾尝试将activx概念与Rust书中的线程共享状态示例相结合,但我不明白这个错误或如何解决它。这是我所能缩小的代码:

use actix_web::{get, http, web, App, HttpResponse, HttpServer, Responder};
use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};
struct AppState {
status: String,
}
#[get("/")]
async fn index(data: web::Data<Mutex<AppState>>) -> impl Responder {
let state = data.lock().unwrap();
HttpResponse::Ok()
.insert_header(http::header::ContentType::plaintext())
.body(state.status.to_owned())
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let status_string = get_state().await.unwrap();
let app_data = Arc::new(Mutex::new(web::Data::new(AppState {
status: status_string,
})));
let app_data1 = Arc::clone(&app_data);
actix_web::rt::spawn(async move {
loop {
println!("I get executed every 2-ish seconds!");
sleep(Duration::from_millis(2000)).await;
let res = get_state().await;
let mut app_data = app_data1.lock().unwrap();
// Edit 2: this line is not accepted by the compiler
// Edit 2: *app_data.status = res.unwrap();
// Edit 2: but this line is accepted
*app_data = web::Data::new(AppState { status: res });
}
});
let app_data2 = Arc::clone(&app_data);
// Edit 2: but I get an error here now
HttpServer::new(move || App::new().app_data(app_data2).service(index))
.bind(("127.0.0.1", 9090))?
.run()
.await
}
async fn get_state() -> Result<String, Box<dyn std::error::Error>> {
let client = reqwest::Client::new().get("http://ipecho.net/plain".to_string());
let status = client.send().await?.text().await?;
println!("got status: {status}");
Ok(status)
}

但我得到以下错误:

error[E0308]: mismatched types
--> src/main.rs:33:32
|
33 |             *app_data.status = res.unwrap();
|             ----------------   ^^^^^^^^^^^^ expected `str`, found struct `String`
|             |
|             expected due to the type of this binding
error[E0277]: the size for values of type `str` cannot be known at compilation time
--> src/main.rs:33:13
|
33 |             *app_data.status = res.unwrap();
|             ^^^^^^^^^^^^^^^^ doesn't have a size known at compile-time
|
= help: the trait `std::marker::Sized` is not implemented for `str`
= note: the left-hand-side of an assignment must have a statically known size
Some errors have detailed explanations: E0277, E0308.
For more information about an error, try `rustc --explain E0277`.

为什么我突然得了str?有没有一个简单的解决方案,或者我解决这个问题的方法是错误的?Edit:也许删除*是正确的方法,正如Peter Hall所建议的那样,但这给了我以下错误:

error[E0594]: cannot assign to data in an `Arc`
--> src/main.rs:33:13
|
33 |             app_data.status = res.unwrap();
|             ^^^^^^^^^^^^^^^ cannot assign
|
= help: trait `DerefMut` is required to modify through a dereference, but it is not implemented for `Arc<AppState>`
error[E0507]: cannot move out of `app_data2`, a captured variable in an `Fn` closure
--> src/main.rs:38:49
|
37 |     let app_data2 = Arc::clone(&app_data);
|         --------- captured outer variable
38 |     HttpServer::new(move || App::new().app_data(app_data2).service(index))
|                     -------                     ^^^^^^^^^ move occurs because `app_data2` has type `Arc<std::sync::Mutex<Data<AppState>>>`, which does not implement the `Copy` trait
|                     |
|                     captured by this `Fn` closure
Some errors have detailed explanations: E0507, E0594.
For more information about an error, try `rustc --explain E0507`.

Edit 2:我现在得到以下错误(代码更改用上面的"编辑2"注释(:

error[E0507]: cannot move out of `app_data2`, a captured variable in an `Fn` closure
--> src/main.rs:46:49
|
45 |     let app_data2 = app_data.clone();
|         --------- captured outer variable
46 |     HttpServer::new(move || App::new().app_data(app_data2).service(index))
|                     -------                     ^^^^^^^^^ move occurs because `app_data2` has type `Arc<Mutex<Data<AppState>>>`, which does not implement the `Copy` trait
|                     |
|                     captured by this `Fn` closure
For more information about this error, try `rustc --explain E0507`.

我的Cargo.toml依赖项:

[dependencies]
actix-web = "4.2.1"
reqwest = "0.11.12"
tokio = "1.21.2"

异步/同步解决方案

我没有用reqwest做异步get,而是用同步机箱minreq(我在大量搜索后找到的(来解决问题。我还选择不使用#[actix_web::main]宏,而是在主函数的末尾显式启动运行时。

use actix_web::{get, http, rt, web, App, HttpResponse, HttpServer, Responder};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
struct AppState {
status: String,
}
#[get("/")]
async fn index(data: web::Data<Arc<Mutex<AppState>>>) -> impl Responder {
let state = &data.lock().unwrap();
HttpResponse::Ok()
.insert_header(http::header::ContentType::plaintext())
.body(state.status.clone())
}
fn main() -> std::io::Result<()> {
let status_string = get_state().unwrap();
let app_data = Arc::new(Mutex::new(AppState {
status: status_string,
}));
let app_data1 = Arc::clone(&app_data);
thread::spawn(move || loop {
thread::sleep(Duration::from_millis(2000));
let res = get_state().unwrap();
let mut app_data = app_data1.lock().unwrap();
*app_data = AppState { status: res };
});
rt::System::new().block_on(
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(app_data.clone()))
.service(index)
})
.bind(("127.0.0.1", 9090))?
.run(),
)
}
fn get_state() -> Result<String, Box<dyn std::error::Error>> {
let resp = minreq::get("http://ipecho.net/plain").send().unwrap();
let state = resp.as_str().unwrap();
Ok(state.to_string())
}

异步解决方案

我的类型有点混乱,将应用程序状态设置为Arc<Mutex<T>>似乎是一种选择,也许使用Arc<RwLock<T>>会更好。

use actix_web::{get, http, web, App, HttpResponse, HttpServer, Responder};
use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};
struct AppState {
status: String,
}
#[get("/")]
async fn index(data: web::Data<Arc<Mutex<AppState>>>) -> impl Responder {
let state = data.lock().unwrap();
HttpResponse::Ok()
.insert_header(http::header::ContentType::plaintext())
.body(state.status.to_owned())
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
let status_string = get_state().await.unwrap();
let app_data = Arc::new(Mutex::new(AppState {
status: status_string,
}));
let app_data1 = app_data.clone();
actix_web::rt::spawn(async move {
loop {
println!("I get executed every 2-ish seconds!");
sleep(Duration::from_millis(2000)).await;
let res = get_state().await.unwrap();
let mut app_data = app_data1.lock().unwrap();
*app_data = AppState { status: res };
}
});
HttpServer::new(move || {
App::new()
.app_data(web::Data::new(app_data.clone()))
.service(index)
})
.bind(("127.0.0.1", 9090))?
.run()
.await
}
async fn get_state() -> Result<String, Box<dyn std::error::Error>> {
let client = reqwest::Client::new().get("http://ipecho.net/plain".to_string());
let status = client.send().await?.text().await?;
println!("got status: {status}");
Ok(status)
}

最新更新