rxpy- flat_map排放等待下一个发电机值



你好!我正在尝试完成我的第一个RXPY项目,但是我遇到了一些问题
毫无疑问python中的flat_map的行为。

在这个项目中,有一个可观察到的发电机(Kafka消费者)创建的。它在收到消息时发出值,然后根据消息执行查询,并为每个结果发出值。

我对代码进行了一些更改,以便更容易复制。Kafka消费者被一个发电机所取代,该发电机在排放之间需要大量时间,而查询结果被可观察到的3个值的可观察到的结果取代。行为仍然相同。

from rx import Observable
generator = (i for i in range(100000000) if i == 0 or i == 50000000)
Observable.from_(generator) 
    .flat_map(lambda i: Observable.from_(['a', 'b', 'c'])) 
    .subscribe(on_next=lambda i: print(i))

输出:

a
(...waits a long time...)
b
a
(...waits a long time...)
c
b
c

我期待这样的事情:

a
b
c
(...waits a long time...)
a
b
c

这种行为的原因是什么?我该怎么做才能获得预期的结果?

谢谢!:)

最近遇到了flat_map操作员的同一问题,而即时列出了这里的帮助。

初始代码更新了rxpy 3:

import rx
from rx.operators import flat_map

generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
    flat_map(
        lambda i: rx.from_(['a', 'b', 'c'])
    )
).subscribe(on_next=lambda i: print(i))

输出有所不同,但问题是相同的:

(... waits a long time ...)
a
b
c
a
b
c

在flat_map中可观察到的可观察到的立即施加器:

import rx
from rx.operators import flat_map
from rx.scheduler import ImmediateScheduler

generator = (i for i in range(100000000) if i == 0 or i == 50000000)
rx.from_(generator).pipe(
    flat_map(
        lambda i: rx.from_(['a', 'b', 'c'], scheduler=ImmediateScheduler())
    )
).subscribe(on_next=lambda i: print(i))

并得到了预期的结果:

a
b
c
(...waits a long time...)
a
b
c

相关内容

  • 没有找到相关文章

最新更新