因为我对Rx有点陌生,并且正在学习它。我查了很多例子,但没有一个符合我的需要。
场景:我有一个套接字服务器套接字(使用简单的套接字对象而不是TCPListener对象创建)。我有多个客户端(客户端套接字,而不是TCPClient)连接到这个服务器套接字。我试图使用Rx(响应式扩展)通过使用"FromAsyncPattern"操作符进行BeginReceive和EndReceive异步操作来获取客户端套接字发送的消息。
我几乎已经开始在缓冲区中获取消息。但问题是,缓冲区返回的要么是从前一个操作接收到的相同消息,要么是当前和前一个接收操作的混合内容。
下面是使用的代码: private void ReceiveMessageFromClient(Socket socket)
{
try
{
if (socket != null)
{
byte[] buffer = new byte[400];
var functionReceiveSocketData = Observable.FromAsyncPattern
<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);
Observable.Defer(() =>
functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
).Repeat().Subscribe(onNext: x => OnMessageReceived(buffer));
}
}
catch (Exception ex)
{
Console.WriteLine(ex.Message);
}
}
onmessagerreceived方法只是一个委托,用于引发消息接收事件并将缓冲区发送给其他类以进一步处理。
问题:每次我收到消息时都使用相同的缓冲区,我如何从以前收到的缓冲区中获得收到的消息。
——如果我收到了部分消息,而下一条消息实际上是同一消息的一部分,该怎么办?
请帮助我与一些代码片段上面的问题,这将是伟大的。张贴这个问题的原因是,我所经历的实现到目前为止正在使用TCPListener,这里的约束是使用套接字对象:(
你遇到的问题是,你在重复调用你的可观察对象之间共享同一个缓冲区实例。您需要确保每次都有一个新的缓冲区实例。似乎还应该根据返回的整数截断缓冲区。以下是我建议在您的if
声明中需要包含的内容:
var functionReceiveSocketData =
Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>
(socket.BeginReceive, socket.EndReceive);
Func<byte[], int, byte[]> copy = (bs, n) =>
{
var rs = new byte[n];
bs.CopyTo(rs, 0);
return rs;
};
Observable
.Defer(() =>
{
var buffer = new byte[400];
return
from n in functionReceiveSocketData(buffer, 0, 400, SocketFlags.None)
select copy(buffer, n);
})
.Repeat()
.Subscribe(x => OnMessageReceived(x));
编辑:回复关于加入消息部分的评论。还固定了上面的解决方案-在copy
lambda的CopyTo
函数中应该是0
而不是n
。
我不是使用套接字的专家,但是,因为套接字只是一个字节流(如果我错了请纠正我),您需要能够确定流中的每个消息何时完成。
这里有一种可能的方法:
/* include `functionReceiveSocketData` & `copy` from above */ =
Func<byte[], byte[], byte[]> append = (bs1, bs2) =>
{
var rs = new byte[bs1.Length + bs2.Length];
bs1.CopyTo(rs, 0);
bs2.CopyTo(rs, bs1.Length);
return rs;
};
var messages = new Subject<byte[]>();
messages.Subscribe(x => OnMessageReceived(x));
Observable
.Defer(() => /* as above */)
.Repeat()
.Scan(new byte[] { }, (abs, bs) =>
{
var current = append(abs, bs);
if (isCompleteMessage(current))
{
messages.OnNext(current);
current = new byte[] { };
}
return current;
})
.Subscribe();
你唯一需要做的就是弄清楚如何实现isCompleteMessage
函数