IScheduler 接口提供
public static IDisposable Schedule(this IScheduler scheduler, Action action)
和
public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, System.Threading.Tasks.Task<IDisposable>> action)
计划异步的方法说明:
// Summary:
// Schedules work using an asynchronous method, allowing for cooperative scheduling
// in an imperative coding style.
//
// Parameters:
// scheduler:
// Scheduler to schedule work on.
//
// action:
// Asynchronous method to run the work, using Yield and Sleep operations for
// cooperative scheduling and injection of cancellation points.
//
// Returns:
// Disposable object that allows to cancel outstanding work on cooperative cancellation
// points or through the cancellation token passed to the asynchronous method.
//
// Exceptions:
// System.ArgumentNullException:
// scheduler or action is null.
有人可以解释这两种方法之间的区别吗?
何时应使用 ScheduleAsync?
我应该何时使用计划?
允许以命令式编码样式进行协作调度意味着什么?
谢谢。
前进
这个答案是基于 Rx 团队在这篇文章中直接给出的明确解释 - 注意它很长,涵盖的不仅仅是这一点。转到标题为在 Rx 查询运算符中利用"异步"的部分,并对所有内容进行了解释,包括标题为"使调度程序更易于使用"await"部分中有关ScheduleAsyc
的特定示例
这是我试图解释的:
总结
ScheduleAsync
的主要动机是采用 C# 5 的异步/等待功能,以简化编写代码,这些代码对许多事件执行"公平"调度,否则可能会导致调度程序缺乏其他操作。这就是"协作调度"的含义 - 与其他共享调度程序的代码很好地配合使用。为此,您可以计划下一个事件,然后在该事件触发之前产生控制权,并挂接到该事件以计划下一个事件,依此类推。
在 Rx 2.0 之前,这是通过递归调度实现的。
幼稚的例子
下面是链接文章中的示例,该示例提供了 Range 运算符的实现。此实现很差,因为它通过不产生控制来使调度程序匮乏:
static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.Schedule(() =>
{
for (int i = 0; i < count; i++)
{
Console.WriteLine("Iteration {0}", i);
observer.OnNext(start + i);
}
observer.OnCompleted();
});
});
}
请注意 OnNext 如何位于一个循环中,在不产生控制的情况下锤击调度程序(如果调度程序是单线程的,则尤其糟糕)。它使其他操作没有机会安排其操作,并且不允许在取消的情况下中止。我们该如何解决这个问题?
递归调度 - Rx 2.0 之前的解决方案
这是通过递归调度解决这个问题的旧方法 - 很难看到发生了什么。这不是"命令式编码风格"。递归调用self()
在你第一次看到它时是非常令人脑洞大开的不透明 - 在我的情况下是第十个,尽管我最终得到了它。传奇人物巴特·德·斯梅特(Bart de Smet)的这篇经典文章将告诉您比您需要了解的更多有关此技术的信息。无论如何,这是递归样式:
static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.Schedule(0, (i, self) =>
{
if (i < count)
{
Console.WriteLine("Iteration {0}", i);
observer.OnNext(start + i);
self(i + 1); /* Here is the recursive call */
}
else
{
observer.OnCompleted();
}
});
});
}
除了更公平之外,如果订阅被释放,下一个挂起的计划操作将被取消。
新的异步/等待样式
这是通过 async/await 的编译器转换进行延续的新方法,它允许"命令式编码风格"。请注意,与递归样式相比,动机是更大的可读性 - async/await 突出地以通常与 .NET 惯用的方式显示正在发生的事情:
static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
return Observable.Create<int>(observer =>
{
return scheduler.ScheduleAsync(async (ctrl, ct) =>
{
for (int i = 0; i < count; i++)
{
Console.WriteLine("Iteration {0}", i);
observer.OnNext(i);
await ctrl.Yield(); /* Use a task continuation to schedule next event */
}
observer.OnCompleted();
return Disposable.Empty;
});
});
}
它看起来就像一个for循环,但实际上await ctrl.Yield()
将产生控制,允许其他代码进入调度程序。它使用 Task 延续一次只调度一个事件 - 也就是说,每次迭代仅在前一次迭代完成后发布到调度程序,从而避免直接在调度程序上排长队。取消也有效,这次 Rx 框架将订阅的处置转换为通过 ct
传入的取消令牌。
如果链接仍然很好,我建议阅读我从中获取的原始帖子!