Rxx库有一些扩展方法对这类操作非常有用。在1.3版本的StreamExtension.cs中,有
我正在使用Rx连接到套接字并接收数据。我遇到的问题是,在连接到套接字后,我需要发送数据进行授权,然后才能接收数据。
连接
public static IObservable<Unit> WhenConnected(this Socket socket, IPAddress address, int port)
{
return Observable.FromAsyncPattern<IPAddress, int>(
socket.BeginConnect,
socket.EndConnect)(address, port);
}
接收数据
不会发布所有代码,但总的来说,它使用TakeWhile从Begin/End receive异步方法重复接收字节。
var receiveData = Observable.FromAsyncPattern
<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);
问题1:
现在我的问题是如何构建我的订阅?
var query = from _ in socket.WhenConnected(IPAddress.Parse(_host), _port)
//need to authorize before receiving data
from value socket.DataReceived().Repeat()
select value;
using (query.Subscribe(...
问题2
通常我会使用NetworkStream而不是Socket发送信息包,所以我需要做一些完全不同的事情吗?
public static IObservable<byte[]> ReadObservable(this Stream stream, int count)
和
public static IObservable<Unit> WriteObservable(this Stream stream, byte[] buffer, int offset, int count)
其可以在网络流上使用。我不确定以下是否是最好的方法,但它可以设置如下:
NetworkStream networkStream = null;
var setupStreamPlan = Observable
.When(WhenConnected(...))
.Then(_ => networkStream = new NetworkStream(socket));
var authorizePlan = Observable
.When(setupStreamPlan)
.Then(_ => networkStream.WriteObserable(/*message to write*/);
var listenPlan = Observable
.When(authorizePlan)
.Then(_ => networkStream.ReadObservable(10).Repeat());
var constantSizeMessageStream = Observable
.When(listenPlan)
.Switch();
EDIT:允许设置networkStream