我正在尝试使用 Rust-Websocket 来创建一个简单的聊天室,多人可以在其中相互交谈。
我查看了示例,"server.rs"和"websockets.html对我来说似乎是一个不错的起点。所以我只是尝试启动它并从网络连接。一切正常,但我只能与自己通信,而不能与其他连接通信(因为它将消息直接发送回sender
而不是每个连接)。
所以我试图获得一个包含所有senders
/clients
的向量,这样我就可以迭代它们并将消息发送给每个向量,但这似乎是有问题的。我无法传达sender
或client
,因为它不是线程安全的,我也无法复制其中任何一个。
不确定我是否只是不了解整个借款 100%,或者它是否不打算像这样进行交叉连接通信。
server.rs:
https://github.com/cyderize/rust-websocket/blob/master/examples/server.rs
网络套接字.html:
https://github.com/cyderize/rust-websocket/blob/master/examples/websockets.html
我可能从错误的方向接近这一点。与所有其他线程共享收到的消息可能更容易。我想了一下,但我唯一能想到的就是使用 channels
从线程内部向外部发送消息。有没有办法直接在线程之间广播消息?我需要做的就是将字符串从一个线程发送到另一个线程。
所以这并不像人们想象的那么简单。
基本上,我使用了一个调度程序线程,它将充当所有连接客户端的控制中心。因此,每当客户端收到消息时,它都会发送到调度程序,然后将消息分发到每个连接的客户端。
我还必须在另一个线程中接收消息,因为在 rust-websocket 中没有非阻塞方式接收消息。然后,我只能使用一个永久循环来检查从 websocket 和调度程序接收的新消息。
以下是我的代码最终的样子:
extern crate websocket;
use std::str;
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::thread;
use websocket::{Server, Message, Sender, Receiver};
use websocket::header::WebSocketProtocol;
use websocket::message::Type;
fn main() {
let server = Server::bind("0.0.0.0:2794").unwrap();
let (dispatcher_tx, dispatcher_rx) = mpsc::channel::<String>();
let client_senders: Arc<Mutex<Vec<mpsc::Sender<String>>>> = Arc::new(Mutex::new(vec![]));
// dispatcher thread
{
let client_senders = client_senders.clone();
thread::spawn(move || {
while let Ok(msg) = dispatcher_rx.recv() {
for sender in client_senders.lock().unwrap().iter() {
sender.send(msg.clone()).unwrap();
}
}
});
}
// client threads
for connection in server {
let dispatcher = dispatcher_tx.clone();
let (client_tx, client_rx) = mpsc::channel();
client_senders.lock().unwrap().push(client_tx);
// Spawn a new thread for each connection.
thread::spawn(move || {
let request = connection.unwrap().read_request().unwrap(); // Get the request
let headers = request.headers.clone(); // Keep the headers so we can check them
request.validate().unwrap(); // Validate the request
let mut response = request.accept(); // Form a response
if let Some(&WebSocketProtocol(ref protocols)) = headers.get() {
if protocols.contains(&("rust-websocket".to_string())) {
// We have a protocol we want to use
response.headers.set(WebSocketProtocol(vec!["rust-websocket".to_string()]));
}
}
let mut client = response.send().unwrap(); // Send the response
let ip = client.get_mut_sender()
.get_mut()
.peer_addr()
.unwrap();
println!("Connection from {}", ip);
let message: Message = Message::text("SERVER: Connected.".to_string());
client.send_message(&message).unwrap();
let (mut sender, mut receiver) = client.split();
let(tx, rx) = mpsc::channel::<Message>();
thread::spawn(move || {
for message in receiver.incoming_messages() {
tx.send(message.unwrap()).unwrap();
}
});
loop {
if let Ok(message) = rx.try_recv() {
match message.opcode {
Type::Close => {
let message = Message::close();
sender.send_message(&message).unwrap();
println!("Client {} disconnected", ip);
return;
},
Type::Ping => {
let message = Message::pong(message.payload);
sender.send_message(&message).unwrap();
},
_ => {
let payload_bytes = &message.payload;
let payload_string = match str::from_utf8(payload_bytes) {
Ok(v) => v,
Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
};
let msg_string = format!("MESSAGE: {}: ", payload_string);
dispatcher.send(msg_string).unwrap();
}
}
}
if let Ok(message) = client_rx.try_recv() {
let message: Message = Message::text(message);
sender.send_message(&message).unwrap();
}
}
});
}
}
http://pastebin.com/H9McWLrH