我希望能够在已经碰巧是 BehaviorSubject 的流上使用重播运算符。 从本质上讲,我希望印刷订阅接收事先发送的排放量:0、1、2、3、4、5。
我试图用.subscribe()
使可观察到的热.另外,我确实知道使源成为ReplaySubject
可以解决问题,但这对我来说不是一个选择。
与大多数语言不同,Python 的rx v3.0
使用管道命令来链接操作 - 例如replay()
或ref_count()
或publish()
- 而不是通常的"."链。以下是管道命令的链接:https://rxpy.readthedocs.io/en/latest/migration.html#pipe-based-operator-chaining
我很确定这个问题与我给 replay(( 的第一个参数有关:lambda x: x
.
import rx.subject
from rx import operators as op
stream = rx.subject.BehaviorSubject(0)
replayable_observable = stream.pipe(op.replay(lambda x: x, buffer_size=100))
replayable_observable.subscribe()
stream.subscribe()
for x in [1, 2, 3, 4, 5]:
stream.on_next(x)
replayable_observable.subscribe(lambda value: print("Received {0}".format(value)))
for x in [6, 7, 8, 9, 10]:
stream.on_next(x)
我希望收到0-10
;或者1-10
。但相反,我收到了5-10
.
Received 5
Received 6
Received 7
Received 8
Received 9
Received 10
看起来要replay
的第一个参数(mapper
函数(旨在允许您在多播后将更多运算符链接到可观察的源。例如,如果将其从lambda x: x
更改为lambda x: x.pipe(op.map(lambda y: y * 2))
则将获得双倍的值。
replay
的文档似乎已经过时了,因为它们提供的示例mapper
函数仍然使用旧式的方法链接而不是管道方法。此外,所有实际使用mapper
参数非默认值的replay
测试似乎都在此项目的 GitHub 存储库中被注释掉,因此没有明确的例子说明如何正确使用此参数。
通过查看源代码,我可以说的是,当您使用replay
而不指定mapper
时,您会返回ConnectableObservable
而不是Observable
。在将值推送到主题之前连接此ConnectableObservable
可以正确缓冲结果。
import rx.subject
from rx import operators as op
stream = rx.subject.BehaviorSubject(0)
replayable_observable = stream.pipe(op.replay(buffer_size=100))
replayable_observable.connect()
for x in [1, 2, 3, 4, 5]:
stream.on_next(x)
replayable_observable.subscribe(lambda value: print("Received {0}".format(value)))
for x in [6, 7, 8, 9, 10]:
stream.on_next(x)
# Received 0
# Received 1
# Received 2
# Received 3
# Received 4
# Received 5
# Received 6
# Received 7
# Received 8
# Received 9
# Received 10