你好!我正在尝试完成我的第一个RXPY项目,但是我遇到了一些问题
毫无疑问python中的flat_map的行为。
在这个项目中,有一个可观察到的发电机(Kafka消费者)创建的。它在收到消息时发出值,然后根据消息执行查询,并为每个结果发出值。
我对代码进行了一些更改,以便更容易复制。Kafka消费者被一个发电机所取代,该发电机在排放之间需要大量时间,而查询结果被可观察到的3个值的可观察到的结果取代。行为仍然相同。
from rx import Observable
generator = (i for i in range(100000000) if i == 0 or i == 50000000)
Observable.from_(generator)
.flat_map(lambda i: Observable.from_(['a', 'b', 'c']))
.subscribe(on_next=lambda i: print(i))
输出:
a
(...waits a long time...)
b
a
(...waits a long time...)
c
b
c
我期待这样的事情:
a
b
c
(...waits a long time...)
a
b
c
这种行为的原因是什么?我该怎么做才能获得预期的结果?
谢谢!:)
最近遇到了flat_map操作员的同一问题,而即时列出了这里的帮助。
初始代码更新了rxpy 3:
import rx
from rx.operators import flat_map
generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
flat_map(
lambda i: rx.from_(['a', 'b', 'c'])
)
).subscribe(on_next=lambda i: print(i))
输出有所不同,但问题是相同的:
(... waits a long time ...)
a
b
c
a
b
c
在flat_map中可观察到的可观察到的立即施加器:
import rx
from rx.operators import flat_map
from rx.scheduler import ImmediateScheduler
generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
flat_map(
lambda i: rx.from_(['a', 'b', 'c'], scheduler=ImmediateScheduler())
)
).subscribe(on_next=lambda i: print(i))
并得到了预期的结果:
a
b
c
(...waits a long time...)
a
b
c