了解莫尼克斯中的观察者



我正在阅读有关观察者的Monix文档,我遇到了以下示例:

或者,您可以快速构建一个仅记录其事件的事件的观察程序 接收。我们将在其他示例中使用它:

import monix.reactive.Observer
val out = Observer.dump("O")
// out: Observer.Sync[Any]
out.onNext(1)
//=> 0: O-->1
// res0: Ack = Continue
out.onNext(2)
//=> 1: O-->2
// res0: Ack = Continue
out.onComplete()
//=> 2: O completed

下一个非法的例子:

喂入两个元素,然后停止。这是不合法的:

// BAD SAMPLE
observer.onNext(1)
observer.onNext(2)
observer.onComplete()

所以我们可以看到相同的onNext -> onNext -> onComplete链。这不合法吗?为什么?

在您链接的文档中,它直接在示例😁之后进行了解释

这是合法的方式:

observer.onNext(1).map {
case Continue =>
// We have permission to continue
observer.onNext(2)
// No back-pressure required here
observer.onComplete()
Stop
case Stop =>
// Nothing else to do
Stop
}

正如您在评论中看到的,问题是背压。那么,为什么有一个例子,使用似乎非法的.dump呢?

请注意该示例中的注释:

//=> 0: O-->1
// res0: Ack = Continue

这些注释显示了如果在 Scala REPL 中运行它会得到什么。当您输入表达式并按回车键时,REPL 会打印类似res0的内容,并让您知道最后一个命令的返回值是什么。

所以这个例子演示:

  • 从 REPL 中馈送观察者
  • 每个.onNext都已完成Continue

编写一个以这种方式为观察者提供信息的程序是不正确的,但这是对为观察者提供食物的合法执行的正确转录。

您可以在"合同"部分下查看与背压相关的规则:

  1. 背压:每个 onNext 调用必须等待上一个 onNext 调用的 Future[Ack] 返回的"继续"结果。
  2. onComplete 和 onError 的背压是可选的:当调用 onComplete 或 onError 时,您不需要等待上一个 onNext 的 Future[Ack]。

这是一个很好的观点,因为优雅的背压管理是反应流的一大承诺。

相关内容

  • 没有找到相关文章

最新更新