我遇到一种情况,我需要使用自定义调度程序来运行任务(这些必须是任务(,并且调度程序没有设置同步上下文(因此我没有收集ObserveOn
、SubscribeOn
、SynchronizationContextScheduler
等(。以下是我最终是如何做到的。现在,我想知道,我不确定这是否是执行异步调用并等待其结果的最合适方式。这是没问题还是有更健壮或更惯用的方法?
var orleansScheduler = TaskScheduler.Current;
var someObservable = ...;
someObservable.Subscribe(i =>
{
Task.Factory.StartNew(async () =>
{
return await AsynchronousOperation(i);
}, CancellationToken.None, TaskCreationOptions.None, orleansScheduler);
});
如果不需要等待怎么办?
<编辑:>我找到了一个具体的例子,一个简化的例子来说明我在这里所做的事情。基本上我在奥尔良使用 Rx,上面的代码是我正在做的事情的基本说明。虽然我也对这种情况感兴趣。
最终代码事实证明,这在奥尔良的背景下有点棘手。我不明白我如何使用ObserveOn
,这正是我想使用的东西。问题是通过使用它,Subscribe
永远不会被调用。代码:
var orleansScheduler = TaskScheduler.Current;
var factory = new TaskFactory(orleansScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
var someObservable = ...;
someObservable
//.ObserveOn(rxScheduler) This doesn't look like useful since...
.SelectMany(i =>
{
//... we need to set the custom scheduler here explicitly anyway.
//See Async SelectMany at http://log.paulbetts.org/rx-and-await-some-notes/.
//Doing the "shorthand" form of .SelectMany(async... would call Task.Run, which
//in turn runs always on .NET ThreadPool and not on Orleans scheduler and hence
//the following .Subscribe wouldn't be called.
return Task.Factory.StartNew(async () =>
{
//In reality this is an asynchronous grain call. Doing the "shorthand way"
//(and optionally using ObserveOn) would get the grain called, but not the
//following .Subscribe.
return await AsynchronousOperation(i);
}, CancellationToken.None, TaskCreationOptions.None, orleansScheduler).Unwrap().ToObservable();
})
.Subscribe(i =>
{
Trace.WriteLine(i);
});
此外,指向 Codeplex 奥尔良论坛中相关线程的链接。
我强烈建议不要对任何现代代码进行StartNew
。它确实有一个用例,但非常罕见。
如果您必须使用自定义任务计划程序,我建议将 ObserveOn
与由计划程序周围的TaskFactory
包装器构造的TaskPoolScheduler
一起使用。这很拗口,所以这是一般的想法:
var factory = new TaskFactory(customScheduler);
var rxScheduler = new TaskPoolScheduler(factory);
someObservable.ObserveOn(rxScheduler)...
然后,您可以使用 SelectMany
在源流中的每个事件到达时为其启动异步操作。
另一种不太理想的解决方案是将async void
用于订阅"事件"。这是可以接受的,但您必须注意错误处理。作为一般规则,不允许异常从异步 void 方法传播。
还有第三种选择,将可观察量挂接到 TPL 数据流块中。像 ActionBlock
这样的块可以指定其任务计划程序,数据流自然可以理解异步处理程序。请注意,默认情况下,数据流块将一次将处理限制为单个元素。
一般来说,与其订阅执行,不如将任务参数投影到任务执行中并仅为结果订阅更好/更惯用。这样,您就可以与下游的Rx进行合成。
例如,给定一个随机任务,例如:
static async Task<int> DoubleAsync(int i, Random random)
{
Console.WriteLine("Started");
await Task.Delay(TimeSpan.FromSeconds(random.Next(10) + 1));
return i * 2;
}
然后你可以做:
void Main()
{
var random = new Random();
// stream of task parameters
var source = Observable.Range(1, 5);
// project the task parameters into the task execution, collect and flatten results
source.SelectMany(i => DoubleAsync(i, random))
// subscribe just for results, which turn up as they are done
// gives you flexibility to continue the rx chain here
.Subscribe(result => Console.WriteLine(result),
() => Console.WriteLine("All done."));
}