我想学习Rust,所以我决定把它用于现实世界的项目。
这个想法是有一个
主线程A
的生成一个新线程
B
,该线程执行一些异步任务,该任务通过时间生成一个值流异步接收客户端websocket连接
[c, d, e, ..]
,并同时处理它们生成新线程[C, D, E, ...]
将线程B中产生的值发送到线程
[C, D, E, ...]
[C, D, E, ...]
中的每个线程将值发布到[c, d, e, ..]
中各自的客户端
我正在使用
tokio
生成新线程,tokio::sync::mpsc::unbounded_channel
将B
中计算的值发送到其他线程tokio_tungstenite
管理websocket连接并向客户端发送值
我设法得到了一个工作示例,其中线程B
生成整数和固定时间间隔。当服务器启动时,B
开始产生值流[0,1,2,3, ..]
。
当打开新的websocket连接时,客户端将从连接打开后产生的值开始接收数据流(因此,如果连接在B
产生值3
之后开始,则客户端将从4
开始接收值(。
这是陷阱。
我发现C
中通道的接收部分异步接收值的唯一方法(因此防止它在B
完全完成时缓冲值并将其发送到c
(是使用一个我认为消耗100%CPU的循环。
我注意到,正因为如此,每个websocket连接都将消耗100%的CPU(因此,如果有两个连接,则打开的CPU使用率将为200%,依此类推(。
这是一个循环:
loop {
while let Ok(v) = rx.try_recv() {
println!("PRINTER ID [{}] | RECEIVED: {:#?}", addr, v);
println!("PRINTER ID [{}] | SENDING TO WS: {:#?}", addr, v);
let mess = Message::Text(v.to_string());ws_sender.send(mess).await?;
}
如果我使用recv()
(而不是try_recv()
(,则值将在B
完成时被缓冲并释放到websocket。
我尝试使用futures_channel::unbounded
而不是tokio
通道,但我遇到了同样的缓冲区问题。
问题:如何重写上面的循环以避免使用100%和流值到websocket而不阻塞?
你可以在这里看到tokio服务器:https://github.com/ceikit/async_data/blob/master/src/bin/tokio_server.rs
您可以通过在另一个运行客户端的终端窗口中旋转websocket连接来测试它
需要将thread::sleep
更改为使用futures-timer
,将sync::Mutex
更改为futures::lock::Mutex
,则带有recv()
的while-let
可以完美地进行