使用Rx订阅进度事件,并且在完成时仍然获得结果



我必须调用一个属于外部组件的方法。该方法的签名看起来像这样:

IImportedData Import(string fileName, Action<int> progress);

这个操作可能需要很长时间才能执行,所以我需要异步调用它并向用户报告进度。我正在寻找不同的方法来称呼它(Rx、TPL、ThreadPool),以找到表达和清晰的东西,但我很难想出在Rx中做到这一点的方法。

乍一看,使用Rx报告进度的想法似乎非常适合——这是一系列与进度有关的int。唯一的问题是,当操作完成时,我需要检查IImportedData以向用户显示结果。OnCompleted并不是为了这个目的,这让我走上了一条拥有一个公开两个IObservable流的类的道路,然后是一个"启动"操作的方法。

private class Importer : IObservable<int>, IObservable<IImportedData>

感觉很笨重,我相信还有一种我不知道的更好的方法。

前面会想到两件事:

  1. Task<T>IProgress<T>似乎更适合这项任务
  2. 实现IObservable<T>通常是不受欢迎的。建议在Observable上使用静态方法创建复合实例

如果你坚持Rx,我建议你看看几年前我和Rx的讨论。Jeffrey van Gogh最终推荐了一个Either<TLeft, TRight>响应,即根据消息是"进度"事件还是"结果"事件,自动路由回调。如果我再次处于那个位置,那肯定是我要走的方向。

你可以试试这样的东西:

Func<string, Action<int>, IObservable<IImportedData>> create =
    (fileName, progress) =>
        Observable.Create<IImportedData>(o =>
        {
            var foo = new Foo();
            var subject = new BehaviorSubject<int>(0);
            var d1 = subject.Subscribe(progress);
            var result = foo.Import(fileName, n => subject.OnNext(n));
            var d2 = subject
                .Where(x => x == 100)
                .Select(x => result)
                .Subscribe(o);
            return new CompositeDisposable(d1, d2);
        });

我还没有测试过,但像这样的东西应该会给你一种相对干净的Rx方式来做你想要的事情。您可能需要添加同步上下文。

相关内容

  • 没有找到相关文章

最新更新