我必须调用一个属于外部组件的方法。该方法的签名看起来像这样:
IImportedData Import(string fileName, Action<int> progress);
这个操作可能需要很长时间才能执行,所以我需要异步调用它并向用户报告进度。我正在寻找不同的方法来称呼它(Rx、TPL、ThreadPool),以找到表达和清晰的东西,但我很难想出在Rx中做到这一点的方法。
乍一看,使用Rx报告进度的想法似乎非常适合——这是一系列与进度有关的int。唯一的问题是,当操作完成时,我需要检查IImportedData
以向用户显示结果。OnCompleted并不是为了这个目的,这让我走上了一条拥有一个公开两个IObservable流的类的道路,然后是一个"启动"操作的方法。
private class Importer : IObservable<int>, IObservable<IImportedData>
感觉很笨重,我相信还有一种我不知道的更好的方法。
前面会想到两件事:
Task<T>
和IProgress<T>
似乎更适合这项任务- 实现
IObservable<T>
通常是不受欢迎的。建议在Observable
上使用静态方法创建复合实例
如果你坚持Rx,我建议你看看几年前我和Rx的讨论。Jeffrey van Gogh最终推荐了一个Either<TLeft, TRight>
响应,即根据消息是"进度"事件还是"结果"事件,自动路由回调。如果我再次处于那个位置,那肯定是我要走的方向。
你可以试试这样的东西:
Func<string, Action<int>, IObservable<IImportedData>> create =
(fileName, progress) =>
Observable.Create<IImportedData>(o =>
{
var foo = new Foo();
var subject = new BehaviorSubject<int>(0);
var d1 = subject.Subscribe(progress);
var result = foo.Import(fileName, n => subject.OnNext(n));
var d2 = subject
.Where(x => x == 100)
.Select(x => result)
.Subscribe(o);
return new CompositeDisposable(d1, d2);
});
我还没有测试过,但像这样的东西应该会给你一种相对干净的Rx方式来做你想要的事情。您可能需要添加同步上下文。