调度器:即时线程与当前线程



阅读原因的解释后

Observable.Return(5)
  .Repeat()
  .Take(1)

从未完成,但

Observable.Return(5, Scheduler.CurrentThread)
  .Repeat()
  .Take(1)

如预期工作。我仍然很困惑,我不知道为什么CurrentThread真的解决了这个问题。有人能解释清楚吗?

Ned Stoyanov在上面的评论中提供的链接有Dave Sexton的精彩解释。

我会尝试用不同的方式来说明它。以递归方法中发生递归调用为例。

public class RecursiveTest()
{
    private bool _isDone;
    public void RecursiveMethod()
    {
        if (!_isDone)
        {
            RecursiveMethod();
           // Never gets here...
           _isDone = true;
        }
    }  
}

您可以很容易地看到,这种情况将无限期地重复出现(直到发生StackOverflowException),因为_isDone永远不会设置为true。这是一个过于简化的例子,但它基本上就是你的第一个例子。

这是Dave Sexton对第一个例子中发生的事情的解释。

默认情况下,Return使用ImmediateScheduler调用OnNext(1),然后OnCompleted()。Repeat没有引入任何并发性,因此它看到OnCompleted立即,然后立即重新订阅Return。因为《归来》中没有蹦床,所以这种模式会重复出现,无限期地阻塞当前线程。对此调用Subscribe可观察的永远不会回来。

换句话说,由于可重入性的无限循环,初始流永远不会完全完成。因此,我们需要一种方法来完成初始流程,而不需要重新进入。

让我们回到我在这篇文章中的RecursiveTest例子,避免无限递归的解决方案是什么?在再次执行递归方法之前,我们需要递归方法完成其流程。这样做的一种方法是有一个队列,并将对递归方法的调用排入队列,如下所示:

public void RecursiveMethod()
{
    if (!_isDone)
    {
        Enqueue(RecursiveMethod);
        _isDone = true;
    }
}  

这样,初始流将完成,_isDone将设置为true,并且当执行对RecursiveMethod的下一个调用时,将不再执行任何内容,从而避免无限递归。这几乎就是Scheduler.CurrentThread将对您的第二个示例执行的操作。

让我们看看Dave Sexton如何解释第二个例子的工作原理:

这里,Return使用CurrentTheadScheduler调用OnNext(1),然后OnCompleted()。Repeat没有引入任何并发性,因此它看到OnCompleted立即,然后立即重新订阅Return;但是,对Return的第二次订阅安排其(内部)蹦床上的动作,因为它仍在蹦床上执行来自第一个计划(外部)操作的OnCompleted回调,因此重复不会立即发生。这允许Repeat返回一次性到Take,最终调用OnCompleted,取消通过处理Repeat进行重复,并最终从Subscribe调用退货。

再一次,我的例子被简化了,使其易于理解,但它并不是真正的工作方式。在这里,您可以看到调度器是如何真正工作的。它使用了他们所说的蹦床,这基本上是一个确保没有可重入呼叫的队列。因此,所有调用都在同一个线程上一个接一个地序列化。通过这样做,可以完成初始流,从而避免无限重入循环。

希望这更清楚一点:)

相关内容

  • 没有找到相关文章

最新更新