如何连接到TCP套接字,在接收数据之前发送数据以获得授权



我正在使用Rx连接到套接字并接收数据。我遇到的问题是,在连接到套接字后,我需要发送数据进行授权,然后才能接收数据。

连接

public static IObservable<Unit> WhenConnected(this Socket socket, IPAddress address, int port)
{
    return Observable.FromAsyncPattern<IPAddress, int>(
        socket.BeginConnect,
        socket.EndConnect)(address, port);
}

接收数据

不会发布所有代码,但总的来说,它使用TakeWhile从Begin/End receive异步方法重复接收字节。

var receiveData = Observable.FromAsyncPattern
    <byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);

问题1:

现在我的问题是如何构建我的订阅?

var query = from _ in socket.WhenConnected(IPAddress.Parse(_host), _port)
            //need to authorize before receiving data
            from value socket.DataReceived().Repeat()
            select value;
using (query.Subscribe(...

问题2

通常我会使用NetworkStream而不是Socket发送信息包,所以我需要做一些完全不同的事情吗?

Rxx库有一些扩展方法对这类操作非常有用。在1.3版本的StreamExtension.cs中,有
public static IObservable<byte[]> ReadObservable(this Stream stream, int count)

public static IObservable<Unit> WriteObservable(this Stream stream, byte[] buffer, int offset, int count)

其可以在网络流上使用。我不确定以下是否是最好的方法,但它可以设置如下:

NetworkStream networkStream = null;
var setupStreamPlan = Observable
  .When(WhenConnected(...))
  .Then(_ => networkStream = new NetworkStream(socket));
var authorizePlan = Observable
  .When(setupStreamPlan)
  .Then(_ => networkStream.WriteObserable(/*message to write*/);
var listenPlan = Observable
  .When(authorizePlan)
  .Then(_ => networkStream.ReadObservable(10).Repeat());
var constantSizeMessageStream = Observable
  .When(listenPlan)
  .Switch();

EDIT:允许设置networkStream

相关内容

  • 没有找到相关文章

最新更新