我正在尝试使用标准System.Net Socket API来侦听某些连接,并计划使用Reactive Extensions来弥合这一差距,并创建一种直观的方式来侦听上述连接。
到目前为止,这是我的代码:
public RxConnectionListener(int port, Socket socket, IScheduler scheduler)
{
_socket = socket;
// TODO: Lazy binding?
_socket.Bind(new IPEndPoint(IPAddress.Any, port));
_socket.Listen(0);
var task = Task.Factory.FromAsync(
socket.BeginAccept,
result => socket.EndAccept(result),
null);
_connections = Observable.Defer(() => Observable.FromAsync(() => task)
).Select(s => new RxConnection(s))
.ObserveOn(scheduler)
.Repeat();
}
现在,监听套接字按计划工作——我收到的连接没有问题。问题是,第一个连接被接收了不止一次(即,似乎Observable.FromAsync
正在缓存异步task
对象的结果(。我知道这显然是由于Repeat()
语句造成的,但我的印象是,将Observable.FromAsync
封装在Observable.Defer
中,然后在延迟的可观察对象上调用Repeat
会绕过缓存——我做错了什么?
订阅代码很简单:
listener
.Connections
.Subscribe(OnNewConnection);
其中listener.Connections
是由_connections
支持的名为Connections
的RxConnectionListener
实例上的属性
OnNewConnection
如下:
protected virtual void OnNewConnection(IConnection connection)
{
Console.WriteLine(connection.RemoteAddress);
}
尝试通过TCP连接一次后观察到的(双关语(输出:
::ffff:127.0.0.1
::ffff:127.0.0.1
::ffff:127.0.0.1
::ffff:127.0.0.1
..
(to infinity and beyond)
编辑:为了完整起见,我使用的是EventLoopScheduler
,尽管注释掉ObserveOn
调用没有什么区别。
通过编写
var task = Task.Factory.FromAsync(
socket.BeginAccept,
result => socket.EndAccept(result),
null);
您创建了一个连接下一个套接字的任务。如果你两次询问任务的结果,两次都会得到相同的结果。这很正常:任务总是这样运行:它运行到完成,然后"缓存"其结果(无论是正常结束还是异常(。
你想做的是创建一个创建任务的函数,比如:
Func<Task<Socket>> acceptTask = () =>
{
return Task.Factory.FromAsync(
socket.BeginAccept,
result => socket.EndAccept(result),
null);
};
现在,您可以很容易地从这个任务工厂创建一个可观察的:Observable.FromAsyn(acceptTask)
请注意,从您从异步模式创建的任务中创建可观察对象可能是个坏主意:有一些方法可以直接从模式中创建可观测对象:将从任务中创建的可观测对象保存到您想要从中创建可测对象的操作已经是任务的情况下。
我很确定Observable.FromAsync会缓存您正在构建的任务的Result属性,因此重复只会给您返回任务的结果,而不是每次都返回一个新的Observable。为了创建有效的可重复连接,每次都需要重新构建任务。这样,套接字结果将不再被缓存。
var _connections = Observable.Defer(() => Observable.FromAsync(() =>
Task.Factory.FromAsync(
socket.BeginAccept,
result => socket.EndAccept(result),
null)))
.Select(s => new RxConnection(s))
.Repeat();