Rust多线程异步Websocket服务器



我想学习Rust,所以我决定把它用于现实世界的项目。

这个想法是有一个

主线程A
  1. 生成一个新线程B,该线程执行一些异步任务,该任务通过时间生成一个值流

  2. 异步接收客户端websocket连接[c, d, e, ..],并同时处理它们生成新线程[C, D, E, ...]

  3. 将线程B中产生的值发送到线程[C, D, E, ...]

  4. [C, D, E, ...]中的每个线程将值发布到[c, d, e, ..]中各自的客户端

我正在使用

  • tokio生成新线程,tokio::sync::mpsc::unbounded_channelB中计算的值发送到其他线程

  • tokio_tungstenite管理websocket连接并向客户端发送值

我设法得到了一个工作示例,其中线程B生成整数和固定时间间隔。当服务器启动时,B开始产生值流[0,1,2,3, ..]

当打开新的websocket连接时,客户端将从连接打开后产生的值开始接收数据流(因此,如果连接在B产生值3之后开始,则客户端将从4开始接收值(。

这是陷阱。

我发现C中通道的接收部分异步接收值的唯一方法(因此防止它在B完全完成时缓冲值并将其发送到c(是使用一个我认为消耗100%CPU的循环。

我注意到,正因为如此,每个websocket连接都将消耗100%的CPU(因此,如果有两个连接,则打开的CPU使用率将为200%,依此类推(。

这是一个循环:

loop {
while let Ok(v) = rx.try_recv() {
println!("PRINTER ID [{}] | RECEIVED: {:#?}", addr, v);
println!("PRINTER ID [{}] | SENDING TO WS: {:#?}", addr, v);
let mess = Message::Text(v.to_string());ws_sender.send(mess).await?;
}

如果我使用recv()(而不是try_recv()(,则值将在B完成时被缓冲并释放到websocket。

我尝试使用futures_channel::unbounded而不是tokio通道,但我遇到了同样的缓冲区问题。

问题:如何重写上面的循环以避免使用100%和流值到websocket而不阻塞?

你可以在这里看到tokio服务器:https://github.com/ceikit/async_data/blob/master/src/bin/tokio_server.rs

您可以通过在另一个运行客户端的终端窗口中旋转websocket连接来测试它

需要将thread::sleep更改为使用futures-timer,将sync::Mutex更改为futures::lock::Mutex,则带有recv()while-let可以完美地进行

最新更新