将异步方法转换为返回 IObservable<>



我有一个async方法,它是一个长时间运行的方法,读取流,当它发现有东西时会触发事件:

public static async void GetStream(int id, CancellationToken token)

它需要一个取消令牌,因为它是在一个新任务中创建的。在内部,它在读取流时调用await

var result = await sr.ReadLineAsync()

现在,我想将其转换为一个返回IObservable<>的方法这样我就可以把它和反应扩展一起使用。根据我所读到的内容,最好的方法是使用Observable.Create,由于RX 2.0现在也支持异步,我可以使用这样的东西:

public static IObservable<Message> ObservableStream(int id, CancellationToken token)
{
    return Observable.Create<Message>(
        async (IObserver<Message> observer) =>
            {

里面的其余代码是相同的,但我调用的不是触发事件observer.OnNext()。但是,这感觉不对。首先,我将CancellationTokens混合在一起,尽管添加async关键字使其发挥了作用,但这实际上是最好的做法吗?我这样称呼我的ObservableStream:

Client.ObservableStream(555404, token).ObserveOn(Dispatcher.CurrentDispatcher).SubscribeOn(TaskPoolScheduler.Default).Subscribe(m => Messages.Add(m));

你是对的。一旦通过IObservable表示接口,就应该避免要求调用方提供CancellationToken。这并不意味着你不能在内部使用它们。Rx提供了几种机制来生成CancellationToken实例,当观察者取消订阅您的可观察对象时,这些实例将被取消。

有很多方法可以解决你的问题。最简单的方法几乎不需要对代码进行任何更改。它使用Observable.Create的过载,为您提供一个CancellationToken,如果呼叫者取消订阅,就会触发:

public static IObservable<Message> ObservableStream(int id)
{
    return Observable.Create<Message>(async (observer, token) =>
    {
         // no exception handling required.  If this method throws,
         // Rx will catch it and call observer.OnError() for us.
         using (var stream = /*...open your stream...*/)
         {
             string msg;
             while ((msg = await stream.ReadLineAsync()) != null)
             {
                 if (token.IsCancellationRequested) { return; }
                 observer.OnNext(msg);
             }
             observer.OnCompleted();
         }
    });
}

您应该更改GetStream以返回Task,而不是void(返回async void是不好的,除非绝对需要,正如svick所评论的那样)。返回Task后,只需调用.ToObservable()即可完成。

例如:

public static async Task<int> GetStream(int id, CancellationToken token) { ... }

然后,

GetStream(1, new CancellationToken(false))
   .ToObservable()
   .Subscribe(Console.Write);

相关内容

  • 没有找到相关文章

最新更新