我正在阅读有关观察者的Monix文档,我遇到了以下示例:
或者,您可以快速构建一个仅记录其事件的事件的观察程序 接收。我们将在其他示例中使用它:
import monix.reactive.Observer val out = Observer.dump("O") // out: Observer.Sync[Any] out.onNext(1) //=> 0: O-->1 // res0: Ack = Continue out.onNext(2) //=> 1: O-->2 // res0: Ack = Continue out.onComplete() //=> 2: O completed
下一个非法的例子:
喂入两个元素,然后停止。这是不合法的:
// BAD SAMPLE observer.onNext(1) observer.onNext(2) observer.onComplete()
所以我们可以看到相同的onNext -> onNext -> onComplete
链。这不合法吗?为什么?
在您链接的文档中,它直接在示例😁之后进行了解释
这是合法的方式:
observer.onNext(1).map {
case Continue =>
// We have permission to continue
observer.onNext(2)
// No back-pressure required here
observer.onComplete()
Stop
case Stop =>
// Nothing else to do
Stop
}
正如您在评论中看到的,问题是背压。那么,为什么有一个例子,使用似乎非法的.dump
呢?
请注意该示例中的注释:
//=> 0: O-->1
// res0: Ack = Continue
这些注释显示了如果在 Scala REPL 中运行它会得到什么。当您输入表达式并按回车键时,REPL 会打印类似res0
的内容,并让您知道最后一个命令的返回值是什么。
所以这个例子演示:
- 从 REPL 中馈送观察者
- 每个
.onNext
都已完成Continue
编写一个以这种方式为观察者提供信息的程序是不正确的,但这是对为观察者提供食物的合法执行的正确转录。
您可以在"合同"部分下查看与背压相关的规则:
- 背压:每个 onNext 调用必须等待上一个 onNext 调用的 Future[Ack] 返回的"继续"结果。
- onComplete 和 onError 的背压是可选的:当调用 onComplete 或 onError 时,您不需要等待上一个 onNext 的 Future[Ack]。
这是一个很好的观点,因为优雅的背压管理是反应流的一大承诺。