如何在C#中使用反应式扩展时显示进度



Am在C#中使用反应扩展来执行一些计算。以下是我的代码到目前为止的样子。我试着把代码包装起来,这样我就可以在我的计算方法中执行一系列任务时显示进度

这是可观察到的

IObservable<ResultWithProgress<SampleResult>> Calculate(){
  return Observable.Create<ResultWithProgress<SampleResult>>(obs => {
     var someTask = DoSomeTask1();
     obs.OnNext(new ResultWithProgress(){Progress = 25, ProgressText     ="Completed Task1"});
     var someOtherTask = DoSomeMoreTask();
     obs.OnNext(new ResultWithProgress(){Progress = 50, ProgressText ="Completed Task2"});
     var calcResult = DoSomeMoreTask2();
     obs.OnNext(new ResultWithProgress(){Progress = 75, ProgressText = "Completed Task3"});
     var calcResult = FinalCalc();
     obs.OnNext(new ResultWithProgress(){Progress = 100, ProgressText ="Completed Task4", Result = calcResult});
     obs.OnCompleted();
  }
}

结果类包装进度和结果

class ResultWithProgress<T>{
 public int Progress {get; set;}
 public Result T {get; set;}
 public string ProgressText {get; set;}
}

包含最终结果的Result对象类SampleResult{}

用法:

Calculate().Subscribe(resultWithProgress => {
  if(resultWithProgress.Result == null) //Show progress using   resultWithProgress.Progress
  else // get the result
})

不知怎么的,我觉得这可能不是最好的方法。我觉得很多次在没有Result的情况下创建ResultWithProgress对象似乎是一种代码气味,尤其是当我有10个以上的任务要在Calculate()中完成时

如果你能给我一些关于如何使用这个的建议,我将不胜感激,或者我处理这个问题是错误的吗?

这个答案使用了Enigmativity的答案所讨论的相同原则。

此版本使用Create的异步重载。

它还利用了。NET 4.5 IProgress而不是原始Action<T>来报告进度。

struct CalculationProgress
{
    public int Progress { get; private set; }
    public string ProgressText { get; private set; }
    public CalculationProgress(int progress, string progressText)
        : this()
    {
        Progress = progress;
        ProgressText = progressText;
    }
}
public IObservable<Result> Calculate(IProgress<CalculationProgress> progress)
{
    return Observable.Create<Result>((observer, cancellationToken) =>
    {
        // run the work on a background thread
        // so we do not block the subscriber
        // and thus the subscriber has a chance
        // to unsubscribe (and cancel the work if desired)
        return Task.Run(() =>
        {
            DoSomeTask1();
            cancellationToken.ThrowIfCancellationRequested();
            progress.Report(new CalculationProgress(25, "First task"));
            DoSomeTask2();
            cancellationToken.ThrowIfCancellationRequested();
            progress.Report(new CalculationProgress(50, "Second task"));
            DoSomeTask3();
            cancellationToken.ThrowIfCancellationRequested();
            progress.Report(new CalculationProgress(75, "third task"));
            var result = DoFinalCalculation();            
            cancellationToken.ThrowIfCancellationRequested();
            progress.Report(new CalculationProgress(100, "final task"));
            observer.OnNext(result);
        }, cancellationToken);
    });
}

我花了一些时间才真正运行您的代码。有许多语法错误,但最重要的是,您的Observable.Create没有返回值。

Observable.Create应该创建一个obs变量订阅的可观察对象,然后返回该IDisposable。这样一来,订阅者就可以在可观察到的内容完成之前终止它。

您的observable直接与obs交互,并在Observable.Create完成之前最终调用obs.OnComplete()。这意味着主叫订户没有机会终止计算,因为它在订阅完成之前就已经完成了!

您需要的是一种在Observable.Create中构建可观察对象的方法,以使其正常工作。

现在,由于您试图在计算过程中返回进度,您预计会产生副作用。因此,在一开始注入状态更容易,否则只需要有一个纯粹的可观察状态。

以下是我可以做这件事的方法。

首先,我将Calculate的签名更改为:

IObservable<string> Calculate(Action<ResultWithProgress<string>> progress)

现在我正在注入一个行动,我可以用来报告我的进展。

以下是对Calculate的调用:

Calculate(rwp => Console.WriteLine(rwp)).Subscribe(result => { });

现在是完整的Calculate方法:

public IObservable<string> Calculate(Action<ResultWithProgress<string>> progress)
{
    return Observable.Create<string>(obs =>
    {
        // This action just removes duplication from the query below
        // and has the purpose of safely calling `progress`
        Action<int, string, string> report = (pv, r, pt) =>
        {
            var p = progress;
            if (p != null)
            {
                p(new ResultWithProgress<string>()
                {
                    Progress = pv,
                    Result = r,
                    ProgressText = pt,
                });
            }
        };
        var query =
            from someTask in Observable.Start(() => DoSomeTask1())
                .Do(x => report(25, x, "Completed Task1"))
            from someOtherTask in Observable.Start(() => DoSomeMoreTask())
                .Do(x => report(50, x, "Completed Task2"))
            from calcResultX in Observable.Start(() => DoSomeMoreTask2())
                .Do(x => report(75, x, "Completed Task3"))
            from calcResult in Observable.Start(() => DoSomeTask1())
                .Do(x => report(100, x, "Completed Task4"))
            select calcResult;
        return query.Subscribe(obs);
    });
}

相关内容

  • 没有找到相关文章

最新更新