我正在尝试使用RxPY将ReactiveX集成到我的GUI中。这是一个通用的ReactiveX问题
假设我有一个可视化,它依赖于使用combine_latest(stream1, stream2, plot_function)
的多个可观察流。当一个可观察对象发生变化时,比如用户修改了一个值;可视化使用新值进行更新。然而,有时两个Observables是同时更新的,比如当用户从单个文件加载两个流的数据时。从技术上讲,一个Observable会在另一个之前被更新(以import函数中先出现的为准)。因此,情节将被更新两次,但实际上它只需要更新一次。
我的一些可视化计算成本很高,所以我想确保如果两个流同时更新,那么合并后的流只发出一个值。我可以想到几个选项:
-
在合并流上使用
debounce()
并设置一个小超时(如50ms)。 -
不要直接使用
combine_latest
。将两个流包装在一个新对象中,该对象也具有某种updating
标志。如果我将updating
标志设置为True,那么在我将updating
标志设置为False之前不要发出任何东西。这种方法给人的感觉是有状态的,而且它破坏了流的可组合性。 -
告诉所有可视化不要更新,直到所有流更新。同样,这会破坏封装,因为可视化不应该关心上游发生了什么。它应该只接收来自合并流的新值,并生成一个漂亮的图片。
-
使可视化的粒度足够细,以便首先更新一个流只会带来很小的性能损失。这对于某些可视化来说是不可能的,比如基于点和网格大小计算网格的可视化。如果或点改变了网格大小,则需要重新计算整个网格。
是否有一些设施在Rx处理"同时"更新多个流?我觉得我要求的是魔法。
对于任何使用Rx制作GUI程序的人:除了通过流发送新值之外,我应该为模型使用一些更好的架构吗?
如果这个问题不清楚,请在评论中告诉我,我会尽量举一个更具体的例子。
下面是一个示例Python RxPY程序:
import rx
stream1 = rx.subjects.BehaviorSubject(1)
stream2 = rx.subjects.BehaviorSubject(2)
rx.Observable
.combine_latest(stream1, stream2, lambda x, y: (x, y))
.subscribe(print)
stream1.on_next(3)
stream2.on_next(4)
这个打印:
(1, 2)
(3, 2)
(3, 4)
如何同时更新stream1
和stream2
的值,使结果如下?
(1, 2)
(3, 4)
换句话说,我如何修改combine_latest
,这样我就可以告诉下游"嘿,等一下,在你发出下一个值之前,我要更新其他流"?
我找到了一个可能的答案,但它不是最好的,我想要其他的。
我发现了pausable
组合子。通过传入一个发出True或False的流,你可以控制一个序列是否暂停。下面是对我的例子的修改:
import rx
stream1 = rx.subjects.BehaviorSubject(1)
stream2 = rx.subjects.BehaviorSubject(2)
pauser = rx.subjects.BehaviorSubject(True)
rx.Observable
.combine_latest(stream1, stream2, lambda x, y: (x, y))
.pausable(pauser)
.subscribe(print)
# Begin updating simultaneously
pauser.on_next(False)
stream1.on_next(3)
stream2.on_next(4)
# Update done, resume combined stream
pauser.on_next(True)
# Prints:
# (1, 2)
# (3, 4)
要应用到我的GUI,我可以在我的模型中创建一个名为updating
的BehaviorSubject
,它发出整个模型是否正在更新。例如,如果stream1
和stream2
同时更新,那么我可以将updating
设置为True。对于任何制作成本高的可视化,我可以应用updating
的值来暂停合并的流。
这在Rx中适用于c#:
var throttled = source.Publish(hot => hot.Buffer(() => hot.Throttle(dueTime));
这里的dueTime
值是。net中的TimeSpan
。它仅仅说明了在它产生一个值之前,您希望处于不活动状态的时间窗口是什么。这基本上吞噬了在一段时间内"同时"产生的值。
本例中的source
就是你的.combine_latest(...)
observable