如何使用python rx的重播?



我希望能够在已经碰巧是 BehaviorSubject 的流上使用重播运算符。 从本质上讲,我希望印刷订阅接收事先发送的排放量:0、1、2、3、4、5。

我试图用.subscribe()使可观察到的热.另外,我确实知道使源成为ReplaySubject可以解决问题,但这对我来说不是一个选择。

与大多数语言不同,Python 的rx v3.0使用管道命令来链接操作 - 例如replay()ref_count()publish()- 而不是通常的"."链。以下是管道命令的链接:https://rxpy.readthedocs.io/en/latest/migration.html#pipe-based-operator-chaining

我很确定这个问题与我给 replay(( 的第一个参数有关:lambda x: x.

import rx.subject
from rx import operators as op
stream = rx.subject.BehaviorSubject(0)
replayable_observable = stream.pipe(op.replay(lambda x: x, buffer_size=100))
replayable_observable.subscribe()
stream.subscribe()
for x in [1, 2, 3, 4, 5]:
stream.on_next(x)
replayable_observable.subscribe(lambda value: print("Received {0}".format(value)))
for x in [6, 7, 8, 9, 10]:
stream.on_next(x)

我希望收到0-10;或者1-10。但相反,我收到了5-10.

Received 5
Received 6
Received 7
Received 8
Received 9
Received 10

看起来要replay的第一个参数(mapper函数(旨在允许您在多播后将更多运算符链接到可观察的源。例如,如果将其从lambda x: x更改为lambda x: x.pipe(op.map(lambda y: y * 2))则将获得双倍的值。

replay的文档似乎已经过时了,因为它们提供的示例mapper函数仍然使用旧式的方法链接而不是管道方法。此外,所有实际使用mapper参数非默认值的replay测试似乎都在此项目的 GitHub 存储库中被注释掉,因此没有明确的例子说明如何正确使用此参数。

通过查看源代码,我可以说的是,当您使用replay而不指定mapper时,您会返回ConnectableObservable而不是Observable。在将值推送到主题之前连接此ConnectableObservable可以正确缓冲结果。

import rx.subject
from rx import operators as op
stream = rx.subject.BehaviorSubject(0)
replayable_observable = stream.pipe(op.replay(buffer_size=100))
replayable_observable.connect()
for x in [1, 2, 3, 4, 5]:
stream.on_next(x)
replayable_observable.subscribe(lambda value: print("Received {0}".format(value)))
for x in [6, 7, 8, 9, 10]:
stream.on_next(x)
# Received 0
# Received 1
# Received 2
# Received 3
# Received 4
# Received 5
# Received 6
# Received 7
# Received 8
# Received 9
# Received 10

相关内容

  • 没有找到相关文章

最新更新