RX IObserver订阅定时



我是。net中的Rx新手,但已经开始使用它,并取得了一些成功,包括一些网络通信

Very简化示例:

    IObservable<Result> SendRequest(Request request)
    {
        return Observable.Create<Result>(observer =>
        {
            NetworkComms.SendReqeust(request,
                result =>
                {
                    observer.OnNext(result);
                    observer.OnCompleted();
                });
            return Disposable.Empty;
        }).SubscribeOn(_commsScheduler);
    }

我的问题是,命令(networkcomm . sendrequest)实际上没有发送,直到调用者订阅返回的IObservable。在某些情况下,请求是"即发即弃"的命令,其结果是毫无意义的,因此调用者实际上订阅返回的IObservable是没有什么意义的。

我需要的功能是:

  1. 命令被立即发送,即使调用者从未订阅过IObservable
  2. 如果和当客户端订阅,他们将得到结果,即使他们晚订阅
  3. 该命令只发送一次,但所有订阅应该得到相同的结果。

我尝试在返回IObservable之前使用. replay (). refcount()并在内部做Subscribe()。这几乎可以工作,但是如果客户机在收到结果之后订阅(因此在完成时自动处理序列之后),则会导致再次调用订阅代码,第二次发送命令。

是否有一个简单的Rx扩展,可以为我处理这种情况,或者我需要滚动我自己的?

看起来你想使用AsyncSubject<T>:

IObservable<Result> SendRequest(Request request)
{
    var subject = new AsyncSubject<Result>();
    NetworkComms.SendReqeust(request, result =>
    {
        subject.OnNext(result);
        subject.OnCompleted();
    });
    return subject.AsObservable();
}

我应该补充一点,你想要的接口有点奇怪。如果语义是"在我调用这个方法的那一刻,发出了一个请求,并且任何想要的人都可以在稍后的时间获得响应",那么考虑只使用Task<Result>而不是IObservable<Result>

Task<Result> SendRequestAsync(Request request)
{
    var tcs = new TaskCompletionSource<Result>();
    NetworkComms.SendReqeust(request, result => tcs.SetResult(result));
    return tcs.Task;
}

此外,考虑将这个Task<Result> SendRequestAsync(Request request)方法直接放在您的NetworkComms类中。

相关内容

  • 没有找到相关文章

最新更新