[东京-RS][文档]具有共享状态示例的多个异步"sub-apps"?



Node.js应用程序的一种常见模式是将它们拆分为许多共享某些状态的"子应用程序"。当然,所有的"子应用程序"都应该异步处理。

下面是这样一个Node应用程序的简单示例,它有三个"子应用程序":

  1. 间隔计时器=>每10秒,共享itv_counter递增
  2. TCP服务器=>对于接收到的每一条TCP消息,共享tcp_counter都会递增
  3. UDP服务器=>对于接收到的每一条UDP消息,共享udp_counter都会递增

每次递增其中一个计数器时,必须打印所有三个计数器(因此"子应用程序"需要共享状态(。

这是Node中的一个实现。Node的好处在于,您可以假设默认情况下几乎所有的I/O操作都是异步处理的。开发人员没有认知开销。

const dgram = require('dgram');
const net = require('net');
const tcp_port = 3000;
const udp_port = 3001;
const tcp_listener = net.createServer();
const udp_listener = dgram.createSocket('udp4');
// state shared by the 3 asynchronous applications
const shared_state = {
itv_counter: 0,
tcp_counter: 0,
udp_counter: 0,
};
// itv async app: increment itv_counter every 10 seconds and print shared state
setInterval(() => {
shared_state.itv_counter += 1;
console.log(`itv async app: ${JSON.stringify(shared_state)}`);
}, 10_000);
// tcp async app: increment tcp_counter every time a TCP message is received and print shared state
tcp_listener.on('connection', (client) => {
client.on('data', (_data) => {
shared_state.tcp_counter += 1;
console.log(`tcp async app: ${JSON.stringify(shared_state)}`);
});
});
tcp_listener.listen(tcp_port, () => {
console.log(`TCP listener on port ${tcp_port}`);
});
// udp async app: increment udp_counter every time a UDP message is received and print shared state
udp_listener.on('message', (_message, _client) => {
shared_state.udp_counter += 1;
console.log(`udp async app: ${JSON.stringify(shared_state)}`);
});
udp_listener.on('listening', () => {
console.log(`UDP listener on port ${udp_port}`);
});
udp_listener.bind(udp_port);

现在,这里是Rust中的一个实现,Tokio作为异步运行时。

use std::sync::{Arc, Mutex};
use std::time::Duration;
use tokio::net::{TcpListener, UdpSocket};
use tokio::prelude::*;
// state shared by the 3 asynchronous applications
#[derive(Clone, Debug)]
struct SharedState {
state: Arc<Mutex<State>>,
}
#[derive(Debug)]
struct State {
itv_counter: usize,
tcp_counter: usize,
udp_counter: usize,
}
impl SharedState {
fn new() -> SharedState {
SharedState {
state: Arc::new(Mutex::new(State {
itv_counter: 0,
tcp_counter: 0,
udp_counter: 0,
})),
}
}
}
#[tokio::main]
async fn main() {
let shared_state = SharedState::new();
// itv async app: increment itv_counter every 10 seconds and print shared state
let itv_shared_state = shared_state.clone();
let itv_handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(10));
interval.tick().await;
loop {
interval.tick().await;
let mut state = itv_shared_state.state.lock().unwrap();
state.itv_counter += 1;
println!("itv async app: {:?}", state);
}
});
// tcp async app: increment tcp_counter every time a TCP message is received and print shared state
let tcp_shared_state = shared_state.clone();
let tcp_handle = tokio::spawn(async move {
let mut tcp_listener = TcpListener::bind("127.0.0.1:3000").await.unwrap();
println!("TCP listener on port 3000");
while let Ok((mut tcp_stream, _)) = tcp_listener.accept().await {
let tcp_shared_state = tcp_shared_state.clone();
tokio::spawn(async move {
let mut buffer = [0; 1024];
while let Ok(byte_count) = tcp_stream.read(&mut buffer).await {
if byte_count == 0 {
break;
}
let mut state = tcp_shared_state.state.lock().unwrap();
state.tcp_counter += 1;
println!("tcp async app: {:?}", state);
}
});
}
});
// udp async app: increment udp_counter every time a UDP message is received and print shared state
let udp_shared_state = shared_state.clone();
let udp_handle = tokio::spawn(async move {
let mut udp_listener = UdpSocket::bind("127.0.0.1:3001").await.unwrap();
println!("UDP listener on port 3001");
let mut buffer = [0; 1024];
while let Ok(_byte_count) = udp_listener.recv(&mut buffer).await {
let mut state = udp_shared_state.state.lock().unwrap();
state.udp_counter += 1;
println!("udp async app: {:?}", state);
}
});
itv_handle.await.unwrap();
tcp_handle.await.unwrap();
udp_handle.await.unwrap();
}

首先,由于我对Tokio和async Rust还不是很满意,所以在这个实现中可能存在一些完全错误的地方,或者是糟糕的做法。如果是这样的话,请告诉我(例如,我不知道最后是否需要三个JoinHandle.await(。也就是说,它的行为与我的简单测试中的Node实现相同。

但我仍然不确定它在异步性方面是否等效。Node应用程序中的每个回调都应该有一个tokio::spawn吗?在这种情况下,我应该将tcp_stream.read()udp_listener.recv()封装在另一个tokio::spawn中,以分别模拟TCP的on('data')和UDP的on('message')的Node回调。不确定。。。

在异步性方面,与Node.js应用程序完全等效的tokio实现是什么?一般来说,什么是一个很好的经验法则来知道什么时候应该用tokio::spawn包起来?

我看到您的任务有三个不同的计数器,所以我认为有一种有意义的方法可以使用状态结构的令牌并在任务之间转换它。因此,每个任务都有责任更新自己的计数器。作为一个建议,我建议使用tokio::sync::mpsc::channel并实现三个mpsc值,每个值从一个任务指向下一个任务。

当然,如果任务之间存在更新周期差异,则存在某些值更新有点晚的风险,但我认为在一般情况下,这是可以忽略的。

最新更新