IScheduler.Schedule vs IScheduler.ScheduleAsync?



IScheduler 接口提供

public static IDisposable Schedule(this IScheduler scheduler, Action action)

public static IDisposable ScheduleAsync(this IScheduler scheduler, Func<IScheduler, CancellationToken, System.Threading.Tasks.Task<IDisposable>> action)

计划异步的方法说明:

    // Summary:
    //     Schedules work using an asynchronous method, allowing for cooperative scheduling
    //     in an imperative coding style.
    //
    // Parameters:
    //   scheduler:
    //     Scheduler to schedule work on.
    //
    //   action:
    //     Asynchronous method to run the work, using Yield and Sleep operations for
    //     cooperative scheduling and injection of cancellation points.
    //
    // Returns:
    //     Disposable object that allows to cancel outstanding work on cooperative cancellation
    //     points or through the cancellation token passed to the asynchronous method.
    //
    // Exceptions:
    //   System.ArgumentNullException:
    //     scheduler or action is null.

有人可以解释这两种方法之间的区别吗?

何时应使用 ScheduleAsync?

我应该何时使用计划?

允许以命令式编码样式进行协作调度意味着什么?

谢谢。

前进

这个答案是基于 Rx 团队在这篇文章中直接给出的明确解释 - 注意它很长,涵盖的不仅仅是这一点。转到标题为在 Rx 查询运算符中利用"异步"的部分,并对所有内容进行了解释,包括标题为"使调度程序更易于使用"await"部分中有关ScheduleAsyc的特定示例

这是我试图解释的:

总结

ScheduleAsync的主要动机是采用 C# 5 的异步/等待功能,以简化编写代码,这些代码对许多事件执行"公平"调度,否则可能会导致调度程序缺乏其他操作。这就是"协作调度"的含义 - 与其他共享调度程序的代码很好地配合使用。为此,您可以计划下一个事件,然后在该事件触发之前产生控制权,并挂接到该事件以计划下一个事件,依此类推。

在 Rx 2.0 之前,这是通过递归调度实现的。

幼稚的例子

下面是链接文章中的示例,该示例提供了 Range 运算符的实现。此实现很差,因为它通过不产生控制来使调度程序匮乏:

static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
    return Observable.Create<int>(observer =>
    {
        return scheduler.Schedule(() =>
        {
            for (int i = 0; i < count; i++)
            {
                Console.WriteLine("Iteration {0}", i);
                observer.OnNext(start + i);
            }
            observer.OnCompleted();
        });
    });
}

请注意 OnNext 如何位于一个循环中,在不产生控制的情况下锤击调度程序(如果调度程序是单线程的,则尤其糟糕)。它使其他操作没有机会安排其操作,并且不允许在取消的情况下中止。我们该如何解决这个问题?

递归调度 - Rx 2.0 之前的解决方案

这是通过递归调度解决这个问题的旧方法 - 很难看到发生了什么。这不是"命令式编码风格"。递归调用self()在你第一次看到它时是非常令人脑洞大开的不透明 - 在我的情况下是第十个,尽管我最终得到了它。传奇人物巴特·德·斯梅特(Bart de Smet)的这篇经典文章将告诉您比您需要了解的更多有关此技术的信息。无论如何,这是递归样式:

static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
    return Observable.Create<int>(observer =>
    {
        return scheduler.Schedule(0, (i, self) =>
        {
            if (i < count)
            {
                Console.WriteLine("Iteration {0}", i);
                observer.OnNext(start + i);
                self(i + 1); /* Here is the recursive call */
            }
            else
            {
                observer.OnCompleted();
            }
        });
    });
}

除了更公平之外,如果订阅被释放,下一个挂起的计划操作将被取消。

新的异步/等待样式

这是通过 async/await 的编译器转换进行延续的新方法,它允许"命令式编码风格"。请注意,与递归样式相比,动机是更大的可读性 - async/await 突出地以通常与 .NET 惯用的方式显示正在发生的事情:

static IObservable<int> Range(int start, int count, IScheduler scheduler)
{
    return Observable.Create<int>(observer =>
    {
        return scheduler.ScheduleAsync(async (ctrl, ct) =>
        {
            for (int i = 0; i < count; i++)
            {
                Console.WriteLine("Iteration {0}", i);
                observer.OnNext(i);
                await ctrl.Yield(); /* Use a task continuation to schedule next event */
            }
            observer.OnCompleted();
            return Disposable.Empty;
        });
    });
}

它看起来就像一个for循环,但实际上await ctrl.Yield()将产生控制,允许其他代码进入调度程序。它使用 Task 延续一次只调度一个事件 - 也就是说,每次迭代仅在前一次迭代完成后发布到调度程序,从而避免直接在调度程序上排长队。取消也有效,这次 Rx 框架将订阅的处置转换为通过 ct 传入的取消令牌。

如果链接仍然很好,我建议阅读我从中获取的原始帖子!

相关内容

  • 没有找到相关文章

最新更新