Am在C#中使用反应扩展来执行一些计算。以下是我的代码到目前为止的样子。我试着把代码包装起来,这样我就可以在我的计算方法中执行一系列任务时显示进度
这是可观察到的
IObservable<ResultWithProgress<SampleResult>> Calculate(){
return Observable.Create<ResultWithProgress<SampleResult>>(obs => {
var someTask = DoSomeTask1();
obs.OnNext(new ResultWithProgress(){Progress = 25, ProgressText ="Completed Task1"});
var someOtherTask = DoSomeMoreTask();
obs.OnNext(new ResultWithProgress(){Progress = 50, ProgressText ="Completed Task2"});
var calcResult = DoSomeMoreTask2();
obs.OnNext(new ResultWithProgress(){Progress = 75, ProgressText = "Completed Task3"});
var calcResult = FinalCalc();
obs.OnNext(new ResultWithProgress(){Progress = 100, ProgressText ="Completed Task4", Result = calcResult});
obs.OnCompleted();
}
}
结果类包装进度和结果
class ResultWithProgress<T>{
public int Progress {get; set;}
public Result T {get; set;}
public string ProgressText {get; set;}
}
包含最终结果的Result对象类SampleResult{}
用法:
Calculate().Subscribe(resultWithProgress => {
if(resultWithProgress.Result == null) //Show progress using resultWithProgress.Progress
else // get the result
})
不知怎么的,我觉得这可能不是最好的方法。我觉得很多次在没有Result的情况下创建ResultWithProgress对象似乎是一种代码气味,尤其是当我有10个以上的任务要在Calculate()中完成时
如果你能给我一些关于如何使用这个的建议,我将不胜感激,或者我处理这个问题是错误的吗?
这个答案使用了Enigmativity的答案所讨论的相同原则。
此版本使用Create
的异步重载。
它还利用了。NET 4.5 IProgress而不是原始Action<T>
来报告进度。
struct CalculationProgress
{
public int Progress { get; private set; }
public string ProgressText { get; private set; }
public CalculationProgress(int progress, string progressText)
: this()
{
Progress = progress;
ProgressText = progressText;
}
}
public IObservable<Result> Calculate(IProgress<CalculationProgress> progress)
{
return Observable.Create<Result>((observer, cancellationToken) =>
{
// run the work on a background thread
// so we do not block the subscriber
// and thus the subscriber has a chance
// to unsubscribe (and cancel the work if desired)
return Task.Run(() =>
{
DoSomeTask1();
cancellationToken.ThrowIfCancellationRequested();
progress.Report(new CalculationProgress(25, "First task"));
DoSomeTask2();
cancellationToken.ThrowIfCancellationRequested();
progress.Report(new CalculationProgress(50, "Second task"));
DoSomeTask3();
cancellationToken.ThrowIfCancellationRequested();
progress.Report(new CalculationProgress(75, "third task"));
var result = DoFinalCalculation();
cancellationToken.ThrowIfCancellationRequested();
progress.Report(new CalculationProgress(100, "final task"));
observer.OnNext(result);
}, cancellationToken);
});
}
我花了一些时间才真正运行您的代码。有许多语法错误,但最重要的是,您的Observable.Create
没有返回值。
Observable.Create
应该创建一个obs
变量订阅的可观察对象,然后返回该IDisposable
。这样一来,订阅者就可以在可观察到的内容完成之前终止它。
您的observable直接与obs
交互,并在Observable.Create
完成之前最终调用obs.OnComplete()
。这意味着主叫订户没有机会终止计算,因为它在订阅完成之前就已经完成了!
您需要的是一种在Observable.Create
中构建可观察对象的方法,以使其正常工作。
现在,由于您试图在计算过程中返回进度,您预计会产生副作用。因此,在一开始注入状态更容易,否则只需要有一个纯粹的可观察状态。
以下是我可以做这件事的方法。
首先,我将Calculate
的签名更改为:
IObservable<string> Calculate(Action<ResultWithProgress<string>> progress)
现在我正在注入一个行动,我可以用来报告我的进展。
以下是对Calculate
的调用:
Calculate(rwp => Console.WriteLine(rwp)).Subscribe(result => { });
现在是完整的Calculate
方法:
public IObservable<string> Calculate(Action<ResultWithProgress<string>> progress)
{
return Observable.Create<string>(obs =>
{
// This action just removes duplication from the query below
// and has the purpose of safely calling `progress`
Action<int, string, string> report = (pv, r, pt) =>
{
var p = progress;
if (p != null)
{
p(new ResultWithProgress<string>()
{
Progress = pv,
Result = r,
ProgressText = pt,
});
}
};
var query =
from someTask in Observable.Start(() => DoSomeTask1())
.Do(x => report(25, x, "Completed Task1"))
from someOtherTask in Observable.Start(() => DoSomeMoreTask())
.Do(x => report(50, x, "Completed Task2"))
from calcResultX in Observable.Start(() => DoSomeMoreTask2())
.Do(x => report(75, x, "Completed Task3"))
from calcResult in Observable.Start(() => DoSomeTask1())
.Do(x => report(100, x, "Completed Task4"))
select calcResult;
return query.Subscribe(obs);
});
}