为什么我的程序卡在epoll_wait



我有一些结构用于反序列化来自GCP警报的请求。顶级结构实现FromResponse,嵌套结构都实现serde的Deserialize和Serialize特性。

以下是结构(为了简洁起见,省略了用serde填充的结构(

#[derive(Debug, Clone, PartialEq)]
struct GcpAlert {
pub headers: GcpHeaders,
pub body: GcpBody,
}
#[async_trait]
impl FromRequest<Body> for GcpAlert {
type Rejection = StatusCode;
async fn from_request(req: &mut RequestParts<Body>) -> Result<Self, Self::Rejection> {
let body = GcpBody::from_request(req).await?;
let headers = GcpHeaders::from_request(req).await?;
Ok(Self { headers, body })
}
}
#[derive(Debug, Clone)]
struct GcpHeaders {
pub host: TypedHeader<Host>,
pub content_length: TypedHeader<ContentLength>,
pub content_type: TypedHeader<ContentType>,
pub user_agent: TypedHeader<UserAgent>,
}
#[async_trait]
impl FromRequest<Body> for GcpHeaders {
type Rejection = StatusCode;

async fn from_request(req: &mut RequestParts<Body>) -> Result<Self, Self::Rejection> {
let bad_req = StatusCode::BAD_REQUEST;
let host: TypedHeader<Host> =
TypedHeader::from_request(req).await.map_err(|_| bad_req)?;
let content_length: TypedHeader<ContentLength> =
TypedHeader::from_request(req).await.map_err(|_| bad_req)?;
let content_type: TypedHeader<ContentType> =
TypedHeader::from_request(req).await.map_err(|_| bad_req)?;
let user_agent: TypedHeader<UserAgent> =
TypedHeader::from_request(req).await.map_err(|_| bad_req)?;
Ok(Self {
host,
content_length,
content_type,
user_agent,
})
}

}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq)]
struct GcpBody {
pub incident: GcpIncident,
pub version: Box<str>,
}
#[async_trait]
impl FromRequest<Body> for GcpBody {
type Rejection = StatusCode;
async fn from_request(req: &mut RequestParts<Body>) -> Result<Self, Self::Rejection> {
let serv_err = StatusCode::INTERNAL_SERVER_ERROR;
let bad_req = StatusCode::BAD_REQUEST;
let body = req.body_mut().as_mut().ok_or(serv_err)?;
let buffer = body::to_bytes(body).await.map_err(|_| serv_err)?;
Ok(serde_json::from_slice(&buffer).map_err(|_|bad_req)? )

}
}

为了测试这一点,我创建了一个测试,将手动实例化的GcpAlert结构与通过axum创建的结构进行比较。请注意,我省略了有关手动创建的结构和请求的详细信息,因为我确信它们是不相关的。

#[tokio::test]
async fn test_request_deserialization() {
async fn handle_alert(alert: GcpAlert) {
let expected = GcpAlert {
headers: GcpHeaders { /* headers */ },
body: GcpBody { /* body */ }
};
assert_eq!(alert, expected);
}
let app = Router::new().route("/", post(handle_alert));
// TestClient is similar to this: https://github.com/tokio-rs/axum/blob/main/axum/src/test_helpers/test_client.rs
let client = TestClient::new(app);
client.post("/")
.header("host", "<host>")
.header("content-length", 1024)
.header("content-type", ContentType::json().to_string())
.header("user-agent", "<user-agent>")
.header("accept-enconding", "gzip, deflate, br")
.body(/* body */)
.send().await;
}

我的问题是程序冻结在GcpBody的FromRequest impl的下面一行中。

let buffer = body::to_bytes(body).await.map_err(|_| serv_err)?;

我试着调试了一下这个问题,但我对assembly/lilvm/etc并不太熟悉。看起来有两个线程对此处于活动状态。我可以通过在测试中使用多线程属性来人为地增加线程数量,但这不会改变最终结果,只是一个更大的调用堆栈。

线程1调用堆栈:

syscall (@syscall:12)
std::sys::unix::futex::futex_wait (@std::sys::unix::futex::futex_wait:64)
std::sys_common::thread_parker::futex::Parker::park_timeout (@std::thread::park_timeout:25)
std::thread::park_timeout (@std::thread::park_timeout:18)
std::sync::mpsc::blocking::WaitToken::wait_max_until (@std::sync::mpsc::blocking::WaitToken::wait_max_until:18)
std::sync::mpsc::shared::Packet<T>::recv (@std::sync::mpsc::shared::Packet<T>::recv:94)
std::sync::mpsc::Receiver<T>::recv_deadline (@test::run_tests:1771)
std::sync::mpsc::Receiver<T>::recv_timeout (@test::run_tests:1696)
test::run_tests (@test::run_tests:1524)
test::console::run_tests_console (@test::console::run_tests_console:290)
test::test_main (@test::test_main:102)
test::test_main_static (@test::test_main_static:34)
gcp_teams_alerts::main (/home/ak_lo/ドキュメント/Rust/gcp-teams-alerts/src/main.rs:1)
core::ops::function::FnOnce::call_once (@core::ops::function::FnOnce::call_once:6)
std::sys_common::backtrace::__rust_begin_short_backtrace (@std::sys_common::backtrace::__rust_begin_short_backtrace:6)
std::rt::lang_start::{{closure}} (@std::rt::lang_start::{{closure}}:7)
core::ops::function::impls::<impl core::ops::function::FnOnce<A> for &F>::call_once (@std::rt::lang_start_internal:242)
std::panicking::try::do_call (@std::rt::lang_start_internal:241)
std::panicking::try (@std::rt::lang_start_internal:241)
std::panic::catch_unwind (@std::rt::lang_start_internal:241)
std::rt::lang_start_internal::{{closure}} (@std::rt::lang_start_internal:241)
std::panicking::try::do_call (@std::rt::lang_start_internal:241)
std::panicking::try (@std::rt::lang_start_internal:241)
std::panic::catch_unwind (@std::rt::lang_start_internal:241)
std::rt::lang_start_internal (@std::rt::lang_start_internal:241)
std::rt::lang_start (@std::rt::lang_start:13)
main (@main:10)
___lldb_unnamed_symbol3139 (@___lldb_unnamed_symbol3139:29)
__libc_start_main (@__libc_start_main:43)
_start (@_start:15)

线程2调用堆栈:

epoll_wait (@epoll_wait:27)
mio::sys::unix::selector::epoll::Selector::select (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/mio-0.8.4/src/sys/unix/selector/epoll.rs:68)
mio::poll::Poll::poll (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/mio-0.8.4/src/poll.rs:400)
tokio::runtime::io::Driver::turn (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/io/mod.rs:162)
<tokio::runtime::io::Driver as tokio::park::Park>::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/io/mod.rs:227)
<tokio::park::either::Either<A,B> as tokio::park::Park>::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/park/either.rs:30)
tokio::time::driver::Driver<P>::park_internal (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/time/driver/mod.rs:238)
<tokio::time::driver::Driver<P> as tokio::park::Park>::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/time/driver/mod.rs:436)
<tokio::park::either::Either<A,B> as tokio::park::Park>::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/park/either.rs:30)
<tokio::runtime::driver::Driver as tokio::park::Park>::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/driver.rs:198)
tokio::runtime::scheduler::current_thread::Context::park::{{closure}} (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:308)
tokio::runtime::scheduler::current_thread::Context::enter (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:349)
tokio::runtime::scheduler::current_thread::Context::park (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:307)
tokio::runtime::scheduler::current_thread::CoreGuard::block_on::{{closure}} (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:554)
tokio::runtime::scheduler::current_thread::CoreGuard::enter::{{closure}} (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:595)
tokio::macros::scoped_tls::ScopedKey<T>::set (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/macros/scoped_tls.rs:61)
tokio::runtime::scheduler::current_thread::CoreGuard::enter (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:595)
tokio::runtime::scheduler::current_thread::CoreGuard::block_on (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:515)
tokio::runtime::scheduler::current_thread::CurrentThread::block_on (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/scheduler/current_thread.rs:161)
tokio::runtime::Runtime::block_on (/home/ak_lo/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.0/src/runtime/mod.rs:490)

在执行过程中,我注意到Data结构被轮询了两次,上下文和状态似乎不匹配(第二次轮询时应该没有任何返回,我已经确认第一次轮询中返回了所有数据(。有人知道为什么程序会继续等待新数据,而它肯定不会来吗?

编辑:为了测试,我将导致冻结的行更改为:

let mut body = BodyStream::from_request(req).await.map_err(|_| serv_err)?.take(1);
let buffer = {
let mut buf = Vec::new();
while let Some(chunk) = body.next().await {
let data = chunk.map_err(|_| serv_err)?;
buf.extend(data);
}
buf
};

在这种情况下测试是成功的。但如果我将take增加到1以上,同样的问题就会再次出现。

我对请求内容不相关的假设是错误的。我在创建测试时得到了不正确的内容长度,结果看起来axum一直在等待。

相关内容

  • 没有找到相关文章

最新更新