我正在尝试取消在 akka 中取消反应流。我已经阅读了这个博客 http://bryangilbert.com/blog/2015/02/04/akka-reactive-streams/我想我对它的工作原理有了基本的了解。然而,我不明白的是这个概念中处理器的目的。这是干什么用的?订阅服务器请求 N 个对象和发布服务器使用 onNext(( 发送它们还不够吗?
假设你有一个真正简单的流程流,只有一个源(发布者(和一个接收器(订阅者(。将这两者挂接起来,接收器订阅发布服务器并开始请求数据,数据流向接收器。在此示例中,您真正需要的只是一个发布者和订阅者。 但在此示例中,从源到接收器的数据不会发生任何变化。它没有以任何方式转换,因此不是很有趣,也不是很有用。
处理器结合了发布服务器和订阅服务器接口,因此可以充当这两个角色。处理器旨在贴靠到源和接收器之间的处理流中并转换数据。如果我将一个放入之前的源/接收器示例中,数据流以及谁订阅了什么会发生变化。现在,接收器订阅到该处理器,处理器又订阅到源。接收器从处理器请求元素,处理器将上游需求传播到源。当有元件满足需求时,它还负责将元件向下推到水槽。这就是为什么它必须实现这两个接口,因为它必须填补这两个角色。
随着您添加的每个处理步骤,例如map
或filter
,您正在添加另一个可以处理背压的地方。这些步骤不是数据的起始点(源(或目标(接收器(。他们要做的就是接收数据并对其执行某些操作,或者更改其流并将元素发送到下游以满足需求。因为他们需要能够链接到任何链,所以他们需要发布和订阅功能,这就是处理器存在的原因。