我是。net中的Rx新手,但已经开始使用它,并取得了一些成功,包括一些网络通信
Very简化示例:
IObservable<Result> SendRequest(Request request)
{
return Observable.Create<Result>(observer =>
{
NetworkComms.SendReqeust(request,
result =>
{
observer.OnNext(result);
observer.OnCompleted();
});
return Disposable.Empty;
}).SubscribeOn(_commsScheduler);
}
我的问题是,命令(networkcomm . sendrequest)实际上没有发送,直到调用者订阅返回的IObservable。在某些情况下,请求是"即发即弃"的命令,其结果是毫无意义的,因此调用者实际上订阅返回的IObservable是没有什么意义的。
我需要的功能是:
- 命令被立即发送,即使调用者从未订阅过IObservable
- 如果和当客户端订阅,他们将得到结果,即使他们晚订阅
- 该命令只发送一次,但所有订阅应该得到相同的结果。
我尝试在返回IObservable之前使用. replay (). refcount()并在内部做Subscribe()。这几乎可以工作,但是如果客户机在收到结果之后订阅(因此在完成时自动处理序列之后),则会导致再次调用订阅代码,第二次发送命令。
是否有一个简单的Rx扩展,可以为我处理这种情况,或者我需要滚动我自己的?
看起来你想使用AsyncSubject<T>
:
IObservable<Result> SendRequest(Request request)
{
var subject = new AsyncSubject<Result>();
NetworkComms.SendReqeust(request, result =>
{
subject.OnNext(result);
subject.OnCompleted();
});
return subject.AsObservable();
}
我应该补充一点,你想要的接口有点奇怪。如果语义是"在我调用这个方法的那一刻,发出了一个请求,并且任何想要的人都可以在稍后的时间获得响应",那么考虑只使用Task<Result>
而不是IObservable<Result>
。
Task<Result> SendRequestAsync(Request request)
{
var tcs = new TaskCompletionSource<Result>();
NetworkComms.SendReqeust(request, result => tcs.SetResult(result));
return tcs.Task;
}
此外,考虑将这个Task<Result> SendRequestAsync(Request request)
方法直接放在您的NetworkComms
类中。