在订阅中调用 Task.Factory.StartNew(async () => {}) 通常是否可疑?



我遇到一种情况,我需要使用自定义调度程序来运行任务(这些必须是任务(,并且调度程序没有设置同步上下文(因此我没有收集ObserveOnSubscribeOnSynchronizationContextScheduler等(。以下是我最终是如何做到的。现在,我想知道,我不确定这是否是执行异步调用并等待其结果的最合适方式。这是没问题还是有更健壮或更惯用的方法?

var orleansScheduler = TaskScheduler.Current;
var someObservable = ...;
someObservable.Subscribe(i =>
{
    Task.Factory.StartNew(async () =>
    {
        return await AsynchronousOperation(i);
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler);          
});

如果不需要等待怎么办?

<编辑:>我找到了一个具体的例子,一个简化的例子来说明我在这里所做的事情。基本上我在奥尔良使用 Rx,上面的代码是我正在做的事情的基本说明。虽然我也对这种情况感兴趣。

最终代码事实证明,这在奥尔良的背景下有点棘手。我不明白我如何使用ObserveOn,这正是我想使用的东西。问题是通过使用它,Subscribe永远不会被调用。代码:

var orleansScheduler = TaskScheduler.Current;
var factory = new TaskFactory(orleansScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
var someObservable = ...;
someObservable
//.ObserveOn(rxScheduler) This doesn't look like useful since...
.SelectMany(i =>
{
    //... we need to set the custom scheduler here explicitly anyway.
    //See Async SelectMany at http://log.paulbetts.org/rx-and-await-some-notes/.
    //Doing the "shorthand" form of .SelectMany(async... would call Task.Run, which
    //in turn runs always on .NET ThreadPool and not on Orleans scheduler and hence
    //the following .Subscribe wouldn't be called. 
    return Task.Factory.StartNew(async () =>
    { 
       //In reality this is an asynchronous grain call. Doing the "shorthand way"
       //(and optionally using ObserveOn) would get the grain called, but not the
       //following .Subscribe. 
       return await AsynchronousOperation(i);
    }, CancellationToken.None, TaskCreationOptions.None, orleansScheduler).Unwrap().ToObservable();
})
.Subscribe(i =>
{
    Trace.WriteLine(i);
});

此外,指向 Codeplex 奥尔良论坛中相关线程的链接。

我强烈建议不要对任何现代代码进行StartNew。它确实有一个用例,但非常罕见。

如果您必须使用自定义任务计划程序,我建议将 ObserveOn 与由计划程序周围的TaskFactory包装器构造的TaskPoolScheduler一起使用。这很拗口,所以这是一般的想法:

var factory = new TaskFactory(customScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
someObservable.ObserveOn(rxScheduler)...

然后,您可以使用 SelectMany 在源流中的每个事件到达时为其启动异步操作。

另一种不太理想的解决方案是将async void用于订阅"事件"。这是可以接受的,但您必须注意错误处理。作为一般规则,不允许异常从异步 void 方法传播。

还有第三种选择,将可观察量挂接到 TPL 数据流块中。像 ActionBlock 这样的块可以指定其任务计划程序,数据流自然可以理解异步处理程序。请注意,默认情况下,数据流块将一次将处理限制为单个元素。

一般来说,与其订阅执行,不如将任务参数投影到任务执行中并仅为结果订阅更好/更惯用。这样,您就可以与下游的Rx进行合成。

例如

,给定一个随机任务,例如:

static async Task<int> DoubleAsync(int i, Random random)
{
    Console.WriteLine("Started");
    await Task.Delay(TimeSpan.FromSeconds(random.Next(10) + 1));
    return i * 2;
}

然后你可以做:

void Main()
{
    var random = new Random();
    // stream of task parameters
    var source = Observable.Range(1, 5);
    // project the task parameters into the task execution, collect and flatten results
    source.SelectMany(i => DoubleAsync(i, random))
          // subscribe just for results, which turn up as they are done
          // gives you flexibility to continue the rx chain here
          .Subscribe(result => Console.WriteLine(result),
                    () => Console.WriteLine("All done."));
}

相关内容

  • 没有找到相关文章

最新更新