使用 rust-websocket 聊天



我正在尝试使用 Rust-Websocket 来创建一个简单的聊天室,多人可以在其中相互交谈。

我查看了示例,"server.rs"和"websockets.html对我来说似乎是一个不错的起点。所以我只是尝试启动它并从网络连接。一切正常,但我只能与自己通信,而不能与其他连接通信(因为它将消息直接发送回sender而不是每个连接)。

所以我试图获得一个包含所有senders/clients的向量,这样我就可以迭代它们并将消息发送给每个向量,但这似乎是有问题的。我无法传达senderclient,因为它不是线程安全的,我也无法复制其中任何一个。

不确定我是否只是不了解整个借款 100%,或者它是否不打算像这样进行交叉连接通信。

server.rs:
https://github.com/cyderize/rust-websocket/blob/master/examples/server.rs

网络套接字.html:
https://github.com/cyderize/rust-websocket/blob/master/examples/websockets.html

我可能从错误的方向接近这一点。与所有其他线程共享收到的消息可能更容易。我想了一下,但我唯一能想到的就是使用 channels 从线程内部向外部发送消息。有没有办法直接在线程之间广播消息?我需要做的就是将字符串从一个线程发送到另一个线程。

所以这并不像人们想象的那么简单。

基本上,我使用了一个调度程序线程,它将充当所有连接客户端的控制中心。因此,每当客户端收到消息时,它都会发送到调度程序,然后将消息分发到每个连接的客户端。

我还必须在另一个线程中接收消息,因为在 rust-websocket 中没有非阻塞方式接收消息。然后,我只能使用一个永久循环来检查从 websocket 和调度程序接收的新消息。

以下是我的代码最终的样子:

extern crate websocket;
use std::str;
use std::sync::{Arc, Mutex};
use std::sync::mpsc;
use std::thread;
use websocket::{Server, Message, Sender, Receiver};
use websocket::header::WebSocketProtocol;
use websocket::message::Type;

fn main() {
    let server = Server::bind("0.0.0.0:2794").unwrap();
    let (dispatcher_tx, dispatcher_rx) = mpsc::channel::<String>();
    let client_senders: Arc<Mutex<Vec<mpsc::Sender<String>>>> = Arc::new(Mutex::new(vec![]));
    // dispatcher thread
    {
        let client_senders = client_senders.clone();
        thread::spawn(move || {
            while let Ok(msg) = dispatcher_rx.recv() {
                for sender in client_senders.lock().unwrap().iter() {
                    sender.send(msg.clone()).unwrap();
                }
            }
        });
    }
    // client threads
    for connection in server {
        let dispatcher = dispatcher_tx.clone();
        let (client_tx, client_rx) = mpsc::channel();
        client_senders.lock().unwrap().push(client_tx);
        // Spawn a new thread for each connection.
        thread::spawn(move || {
            let request = connection.unwrap().read_request().unwrap(); // Get the request
            let headers = request.headers.clone(); // Keep the headers so we can check them
            request.validate().unwrap(); // Validate the request
            let mut response = request.accept(); // Form a response
            if let Some(&WebSocketProtocol(ref protocols)) = headers.get() {
                if protocols.contains(&("rust-websocket".to_string())) {
                    // We have a protocol we want to use
                    response.headers.set(WebSocketProtocol(vec!["rust-websocket".to_string()]));
                }
            }
            let mut client = response.send().unwrap(); // Send the response
            let ip = client.get_mut_sender()
                .get_mut()
                .peer_addr()
                .unwrap();
            println!("Connection from {}", ip);
            let message: Message = Message::text("SERVER: Connected.".to_string());
            client.send_message(&message).unwrap();
            let (mut sender, mut receiver) = client.split();
            let(tx, rx) = mpsc::channel::<Message>();
            thread::spawn(move || {
                for message in receiver.incoming_messages() {
                    tx.send(message.unwrap()).unwrap();
                }
            });
            loop {
                if let Ok(message) = rx.try_recv() {
                    match message.opcode {
                        Type::Close => {
                            let message = Message::close();
                            sender.send_message(&message).unwrap();
                            println!("Client {} disconnected", ip);
                            return;
                        },
                        Type::Ping => {
                            let message = Message::pong(message.payload);
                            sender.send_message(&message).unwrap();
                        },
                        _ => {
                            let payload_bytes = &message.payload;
                            let payload_string = match str::from_utf8(payload_bytes) {
                                Ok(v) => v,
                                Err(e) => panic!("Invalid UTF-8 sequence: {}", e),
                            };
                            let msg_string = format!("MESSAGE: {}: ", payload_string);
                            dispatcher.send(msg_string).unwrap();
                        }
                    }
                }
                if let Ok(message) = client_rx.try_recv() {
                    let message: Message = Message::text(message);
                    sender.send_message(&message).unwrap();
                }
            }
        });
    }
}

http://pastebin.com/H9McWLrH

相关内容

  • 没有找到相关文章

最新更新