UdpFramed with Actix Rust。无法使用接收器写入发送消息



我正在尝试使用Actix编写一个Udp客户端参与者。我已经使用了这个例子UDP Echo,但我似乎无法使用UdpFramed tokio结构向服务器发送消息。这是我到目前为止所拥有的,这是Udp客户端参与者实现

use std::collections::HashMap;
use std::net::{SocketAddr};
use actix_rt::net::UdpSocket;
use actix::{Actor, Addr, AsyncContext, Context, Handler, StreamHandler, Message};
use actix::io::SinkWrite;
use actix_web::web::{Bytes, BytesMut};
use futures_util::stream::{SplitSink};
use futures_util::StreamExt;
use log::info;
use serde_json::Value;
use tokio_util::codec::BytesCodec;
use tokio_util::udp::UdpFramed;
use crate::rosclient::messages::Subscribe;
use std::io::Result;
mod messages;

type SinkItem = (Bytes, SocketAddr);
type UdpSink = SplitSink<UdpFramed<BytesCodec, UdpSocket>, SinkItem>;
pub struct UdpClientActor {
    pub address: SocketAddr,
    pub sink: SinkWrite<SinkItem, UdpSink>,
}
impl UdpClientActor {
    pub fn start(udp: UdpSocket, address: SocketAddr) -> Addr<UdpClientActor> {
        let framed = UdpFramed::new(udp, BytesCodec::new());
        let (split_sink, split_stream) = framed.split();
        UdpClientActor::create(|ctx| {
            ctx.add_stream(split_stream.filter_map(
                |item: Result<(BytesMut, SocketAddr)>| async {
                    item.map(|(data, sender)| UdpPacket(data, sender)).ok()
                },
            ));
            UdpClientActor {
                address,
                sink: SinkWrite::new(split_sink, ctx),
            }
        })
    }
}
impl Actor for UdpClientActor {
    type Context = Context<Self>;
    fn started(&mut self, ctx: &mut Self::Context) {
        let mut hashmap = HashMap::new();
        hashmap.insert(String::from("topic"), Value::String(String::from("/client_count")));
        let subscription = Subscribe {
            id: Default::default(),
            op: "subscribe".to_string(),
            extra: hashmap
        };
        ctx.notify(subscription);
    }
}
#[derive(Message)]
#[rtype(result = "()")]
struct UdpPacket(BytesMut, SocketAddr);
impl StreamHandler<UdpPacket> for
UdpClientActor {
    fn handle(&mut self, item: UdpPacket, _ctx: &mut Self::Context) {
        println!("Received: ({:?}, {:?})", item.0, item.1);
        self.sink.write((item.0.into(), item.1)).unwrap();
    }
}
impl actix::io::WriteHandler<std::io::Error> for UdpClientActor {}
impl Handler<Subscribe> for UdpClientActor {
    type Result = ();
    fn handle(&mut self, msg: Subscribe, _ctx: &mut Self::Context) -> Self::Result {
        let js = serde_json::json!(msg).to_string();
        let _ = self.sink.write((Bytes::from(msg.to_string()), self.address));
        info!("Subscribing to topic {}", js);
    }
}

我的主函数创建udp套接字并生成actor。

fn main() {
    ////////////////////////////////////////////////////////////////////////////
   
    let fut = async {
        ////////////////////////////////////////////////////////////////////////////
        /////////// UDP_ACTOR
        let sock = tokio::net::UdpSocket::bind("0.0.0.0:9091").await.unwrap();
        let remote_addr = "172.30.89.169:9091".parse::<SocketAddr>().unwrap();
        // let message = b"{ "op": "subscribe", "topic": "/client_count"}";
        let _ = sock.connect(remote_addr).await;
        // sock.send(message).await.unwrap();
        let _udp_client = UdpClientActor::start(sock, remote_addr);
    };
    actix_rt::Arbiter::new().spawn(fut);
    // system.block_on(fut);
    system.run().unwrap();
}

如果我删除上的评论

let message = b"{ "op": "subscribe", "topic": "/client_count"}";

sock.send(message).await.unwrap();

我至少可以检查一下服务器是否真的可以接收消息。所以我知道问题一定出在我对actor的实现上。我确实有另一个使用LinesCodec而不是BytesCodec的,它遵循完全相同的实现。唯一的区别是SinkWrite变成了这样:

SinkWrite<(String, SocketAddr), SplitSink<UdpFramed<codec::LinesCodec>,
        (String, SocketAddr)>>

这是我的Cargo.toml供参考。

[package]
name = "local_websocket_client"
version = "0.1.0"
edition = "2018"
[dependencies]
actix="0.12"
actix-codec = "0.4"
actix-rt = "2.5"
bytestring = "1.0"
serde = {version="1.0", features=["serde_derive"]}
log = "0.4"
env_logger = "0.9.0"
chrono = "0.4"
dashmap = "4.0"
futures = "0.3"
openssl = "0.10"
tokio = { version = "1", features = ["full"] }
actix-web = "4.0.0-beta.15"
futures-util = "0.3"
tokio-util = { version="0.6", features=["net", "codec"] }
tokio-udp = "0.1.6"
bytes= { version="0.6", features=["serde"] }
[dependencies.awc]
features = ["openssl"]
version = "3.0.0-beta.9"
[dependencies.serde_json]
features = ["default"]
version = "1.0"
[dependencies.uuid]
features = ["v4", "serde", "v5"]
version = "0.8"

那里有一些额外的板条箱,因为我在同一个应用程序上运行另外两个websocket客户端。

我真的很感激在这件事上能得到一些帮助。谢谢

通过将UdpSocket封装在Arc中并将引用保留在actor中以备将来使用来解决此问题。使用套接字编写消息是可行的。流处理程序使用的分割流不需要更改,因为它按预期工作。

最新更新