如何使用套接字和响应式扩展(Rx)从连接的客户端套接字获取接收到的消息缓冲区



因为我对Rx有点陌生,并且正在学习它。我查了很多例子,但没有一个符合我的需要。

场景:我有一个套接字服务器套接字(使用简单的套接字对象而不是TCPListener对象创建)。我有多个客户端(客户端套接字,而不是TCPClient)连接到这个服务器套接字。我试图使用Rx(响应式扩展)通过使用"FromAsyncPattern"操作符进行BeginReceive和EndReceive异步操作来获取客户端套接字发送的消息。

我几乎已经开始在缓冲区中获取消息。但问题是,缓冲区返回的要么是从前一个操作接收到的相同消息,要么是当前和前一个接收操作的混合内容。

下面是使用的代码:
    private void ReceiveMessageFromClient(Socket socket)
    {
        try
        {
            if (socket != null)
            {
                byte[] buffer = new byte[400];
                var functionReceiveSocketData = Observable.FromAsyncPattern
                                                            <byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);
                Observable.Defer(() =>
                                functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
                                ).Repeat().Subscribe(onNext: x => OnMessageReceived(buffer));
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex.Message);
        }
    }

onmessagerreceived方法只是一个委托,用于引发消息接收事件并将缓冲区发送给其他类以进一步处理。

问题:每次我收到消息时都使用相同的缓冲区,我如何从以前收到的缓冲区中获得收到的消息。

——如果我收到了部分消息,而下一条消息实际上是同一消息的一部分,该怎么办?

请帮助我与一些代码片段上面的问题,这将是伟大的。张贴这个问题的原因是,我所经历的实现到目前为止正在使用TCPListener,这里的约束是使用套接字对象:(

你遇到的问题是,你在重复调用你的可观察对象之间共享同一个缓冲区实例。您需要确保每次都有一个新的缓冲区实例。似乎还应该根据返回的整数截断缓冲区。以下是我建议在您的if声明中需要包含的内容:

var functionReceiveSocketData =
    Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
        (socket.BeginReceive, socket.EndReceive);
Func<byte[], int, byte[]> copy = (bs, n) =>
{
    var rs = new byte[n];
    bs.CopyTo(rs, 0);
    return rs;
};
Observable
    .Defer(() =>
    {
        var buffer = new byte[400];
        return 
            from n in functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
            select copy(buffer, n);
    })
    .Repeat()
    .Subscribe(x => OnMessageReceived(x));

编辑:回复关于加入消息部分的评论。还固定了上面的解决方案-在copy lambda的CopyTo函数中应该是0而不是n

我不是使用套接字的专家,但是,因为套接字只是一个字节流(如果我错了请纠正我),您需要能够确定流中的每个消息何时完成。

这里有一种可能的方法:

/* include `functionReceiveSocketData` & `copy` from above */ =
Func<byte[], byte[], byte[]> append = (bs1, bs2) =>
{
    var rs = new byte[bs1.Length + bs2.Length];
    bs1.CopyTo(rs, 0);
    bs2.CopyTo(rs, bs1.Length);
    return rs;
};
var messages = new Subject<byte[]>();
messages.Subscribe(x => OnMessageReceived(x));
Observable
    .Defer(() => /* as above */)
    .Repeat()
    .Scan(new byte[] { }, (abs, bs) =>
    {
        var current = append(abs, bs);
        if (isCompleteMessage(current))
        {
            messages.OnNext(current);
            current = new byte[] { };
        }
        return current;
    })
    .Subscribe();

你唯一需要做的就是弄清楚如何实现isCompleteMessage函数

相关内容

  • 没有找到相关文章