阅读原因的解释后
Observable.Return(5)
.Repeat()
.Take(1)
从未完成,但
Observable.Return(5, Scheduler.CurrentThread)
.Repeat()
.Take(1)
如预期工作。我仍然很困惑,我不知道为什么CurrentThread
真的解决了这个问题。有人能解释清楚吗?
Ned Stoyanov在上面的评论中提供的链接有Dave Sexton的精彩解释。
我会尝试用不同的方式来说明它。以递归方法中发生递归调用为例。
public class RecursiveTest()
{
private bool _isDone;
public void RecursiveMethod()
{
if (!_isDone)
{
RecursiveMethod();
// Never gets here...
_isDone = true;
}
}
}
您可以很容易地看到,这种情况将无限期地重复出现(直到发生StackOverflowException),因为_isDone永远不会设置为true。这是一个过于简化的例子,但它基本上就是你的第一个例子。
这是Dave Sexton对第一个例子中发生的事情的解释。
默认情况下,Return使用ImmediateScheduler调用OnNext(1),然后OnCompleted()。Repeat没有引入任何并发性,因此它看到OnCompleted立即,然后立即重新订阅Return。因为《归来》中没有蹦床,所以这种模式会重复出现,无限期地阻塞当前线程。对此调用Subscribe可观察的永远不会回来。
换句话说,由于可重入性的无限循环,初始流永远不会完全完成。因此,我们需要一种方法来完成初始流程,而不需要重新进入。
让我们回到我在这篇文章中的RecursiveTest例子,避免无限递归的解决方案是什么?在再次执行递归方法之前,我们需要递归方法完成其流程。这样做的一种方法是有一个队列,并将对递归方法的调用排入队列,如下所示:
public void RecursiveMethod()
{
if (!_isDone)
{
Enqueue(RecursiveMethod);
_isDone = true;
}
}
这样,初始流将完成,_isDone将设置为true,并且当执行对RecursiveMethod的下一个调用时,将不再执行任何内容,从而避免无限递归。这几乎就是Scheduler.CurrentThread将对您的第二个示例执行的操作。
让我们看看Dave Sexton如何解释第二个例子的工作原理:
这里,Return使用CurrentTheadScheduler调用OnNext(1),然后OnCompleted()。Repeat没有引入任何并发性,因此它看到OnCompleted立即,然后立即重新订阅Return;但是,对Return的第二次订阅安排其(内部)蹦床上的动作,因为它仍在蹦床上执行来自第一个计划(外部)操作的OnCompleted回调,因此重复不会立即发生。这允许Repeat返回一次性到Take,最终调用OnCompleted,取消通过处理Repeat进行重复,并最终从Subscribe调用退货。
再一次,我的例子被简化了,使其易于理解,但它并不是真正的工作方式。在这里,您可以看到调度器是如何真正工作的。它使用了他们所说的蹦床,这基本上是一个确保没有可重入呼叫的队列。因此,所有调用都在同一个线程上一个接一个地序列化。通过这样做,可以完成初始流,从而避免无限重入循环。
希望这更清楚一点:)