可观察的方法C#



Observable.FromAsyncPattern可用于从BeginX-EndX风格的异步方法中进行可观察。

也许我误解了一些事情,但是否有类似的函数可以从新的异步风格方法(即.Stream.ReadAsync(中创建可观察的对象?

您可以使用ToObservable:从Task<T>创建IObservable<T>

using System.Reactive.Threading.Tasks;
Stream s = ...;
IObservable<int> o = s.ReadAsync(buffer, offset, count).ToObservable();

请注意,Lee的答案是正确的,但最终我所做的是使用Observable

    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {
       return Observable.Create<Command>(async (subject, token) =>
        {

            try
            {
                while (true)
                {
                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }
                    Command cmd = await reader.ReadCommandAsync();
                    subject.OnNext(cmd);
                }
            }
            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }
        }).Publish();
    }

相关内容

  • 没有找到相关文章

最新更新