传递Actix Web负载到功能



我想从我的路由处理程序中提取一些有效负载处理逻辑。我不知道如何恰当地处理这种类型。具体来说,我想将有效负载标记为Send安全,但是编译器抱怨我不能跨线程发送有效负载。我如何保证这个有效负载将只由相关的工作线程处理?

假设我有一个路由

#[post("/upload")]
pub async fn upload(mut body: Payload) -> Result<HttpResponse> {
Handler {}.handle_payload(body).await?;
}

这是许多可能的处理程序之一,因此我定义了一个带有实现的简单trait。

#[async_trait::async_trait]
pub trait Handle {
async fn handle_payload(&self, s: impl Stream + Send) -> Result<()>;
}
pub struct Handler {}
#[async_trait::async_trait]
impl Handle for Handler {
async fn handle_payload(&self, s: impl Stream + Send) -> Result<()> {
Ok(())
}
}

这会导致编译器提示我不能在线程之间发送Payload。我如何将异步有效载荷处理器排除在路由之外?

|     MyHandler {}.handle_payload(body).await?;
|                  -------------- ^^^^ `Rc<RefCell<actix_http::h1::payload::Inner>>` cannot be sent between threads safely
|                  |
|                  required by a bound introduced by this call
|
= help: within `actix_web::web::Payload`, the trait `std::marker::Send` is not implemented for `Rc<RefCell<actix_http::h1::payload::Inner>>`
= note: required because it appears within the type `actix_http::h1::payload::Payload`
= note: required because it appears within the type `actix_web::dev::Payload`
= note: required because it appears within the type `actix_web::web::Payload`

|     MyHandler {}.handle_payload(body).await?;
|                  -------------- ^^^^ `(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)` cannot be sent between threads safely
|                  |
|                  required by a bound introduced by this call
|
= help: the trait `std::marker::Send` is not implemented for `(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)`
= note: required for `Unique<(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)>` to implement `std::marker::Send`
= note: required because it appears within the type `Box<(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)>`
= note: required because it appears within the type `Pin<Box<(dyn futures_util::Stream<Item = Result<actix_web::web::Bytes, PayloadError>> + 'static)>>`
= note: required because it appears within the type `actix_web::dev::Payload`
= note: required because it appears within the type `actix_web::web::Payload`

Actix web不要求其处理程序是线程安全的。web服务器使用多个线程,但是一旦请求被分配给一个线程,它就不会移动。因此,许多特定于activity的类型(如Payload)不是线程安全的,因为它们不需要线程安全。

然而,async_trait假定异步函数在默认情况下应该是线程安全的。您可以通过在注释中传递?Send来放松此约束:

#[async_trait::async_trait(?Send)]
pub trait Handle {
async fn handle_payload(&self, s: impl Stream + Send) -> Result<()>;
}
pub struct Handler {}
#[async_trait::async_trait(?Send)]
impl Handle for Handler {
async fn handle_payload(&self, s: impl Stream + Send) -> Result<()> {
Ok(())
}
}

参见async-trait文档中的非线程安全future。

最新更新