如何在多线程tokio中取消未来/关闭流



基于tokio在https://github.com/tokio-rs/tokio/blob/master/examples/proxy.rs

let (mut ri, mut wi) = inbound.split();
let (mut ro, mut wo) = outbound.split();
let client_to_server = io::copy(&mut ri, &mut wo);
let server_to_client = io::copy(&mut ro, &mut wi);
try_join(client_to_server, server_to_client).await?;
Ok(())

我有一个修改过的版本,这样我就可以处理每个连接的终止,如下所示:

// Server will disconnect their side normally 8s later, from what I've observed
let server_to_client = io::copy(&mut ro, &mut wi).map(|f| {
server_session_time = server_start_time.elapsed().unwrap();
f
});
// Normally, this will stop first, as the client disconnects as soon as he has the results...
let client_to_server = io::copy(&mut ri, &mut wo).map(|f| {
client_session_time = client_start_time.elapsed().unwrap();
f
});

// Join on both
match try_join(client_to_server, server_to_client).await {...}

这使我能够正确地为客户端计时连接时间,因为客户端在收到答案后立即关闭连接,而代理服务器似乎需要(在我的情况下是8s(才能关闭。

考虑到这种代码结构,一旦我退出client_to_server的未来,是否有可能终止server_to_client的下游连接(即,不要等待我观察到的关闭所需的8秒(?

好的,再举几个例子,我就能理解我必须做什么了。对于将来回到这个问题的任何人来说,需要的是你自己实现双向复制,基于tokio::select!的每个读写的4个未来!。这将允许访问所有流,当其中一个流终止时,如果您想完成对其他流的处理或停止,您可以选择。如上所述,没有办法"取消"其他"副本。。。

您可以同时查看io::copy的实现https://github.com/tokio-rs/tokio-io/blob/master/src/copy.rs和tokio::selecthttps://docs.rs/tokio/0.2.20/tokio/macro.select.html,构建您的4向选择。

最新更新