当两个可观察对象同时变化时合并



我正在尝试使用RxPY将ReactiveX集成到我的GUI中。这是一个通用的ReactiveX问题

假设我有一个可视化,它依赖于使用combine_latest(stream1, stream2, plot_function)的多个可观察流。当一个可观察对象发生变化时,比如用户修改了一个值;可视化使用新值进行更新。然而,有时两个Observables是同时更新的,比如当用户从单个文件加载两个流的数据时。从技术上讲,一个Observable会在另一个之前被更新(以import函数中先出现的为准)。因此,情节将被更新两次,但实际上它只需要更新一次。

我的一些可视化计算成本很高,所以我想确保如果两个流同时更新,那么合并后的流只发出一个值。我可以想到几个选项:

  1. 在合并流上使用debounce()并设置一个小超时(如50ms)。

  2. 不要直接使用combine_latest。将两个流包装在一个新对象中,该对象也具有某种updating标志。如果我将updating标志设置为True,那么在我将updating标志设置为False之前不要发出任何东西。这种方法给人的感觉是有状态的,而且它破坏了流的可组合性。

  3. 告诉所有可视化不要更新,直到所有流更新。同样,这会破坏封装,因为可视化不应该关心上游发生了什么。它应该只接收来自合并流的新值,并生成一个漂亮的图片。

  4. 使可视化的粒度足够细,以便首先更新一个流只会带来很小的性能损失。这对于某些可视化来说是不可能的,比如基于点和网格大小计算网格的可视化。如果点改变了网格大小,则需要重新计算整个网格。

是否有一些设施在Rx处理"同时"更新多个流?我觉得我要求的是魔法。

对于任何使用Rx制作GUI程序的人:除了通过流发送新值之外,我应该为模型使用一些更好的架构吗?

如果这个问题不清楚,请在评论中告诉我,我会尽量举一个更具体的例子。

下面是一个示例Python RxPY程序:

import rx
stream1 = rx.subjects.BehaviorSubject(1)
stream2 = rx.subjects.BehaviorSubject(2)
rx.Observable
  .combine_latest(stream1, stream2, lambda x, y: (x, y))
  .subscribe(print)
stream1.on_next(3)
stream2.on_next(4)
这个打印

:

(1, 2)
(3, 2)
(3, 4)

如何同时更新stream1stream2的值,使结果如下?

(1, 2)
(3, 4)

换句话说,我如何修改combine_latest,这样我就可以告诉下游"嘿,等一下,在你发出下一个值之前,我要更新其他流"?

我找到了一个可能的答案,但它不是最好的,我想要其他的。

我发现了pausable组合子。通过传入一个发出True或False的流,你可以控制一个序列是否暂停。下面是对我的例子的修改:

import rx
stream1 = rx.subjects.BehaviorSubject(1)
stream2 = rx.subjects.BehaviorSubject(2)
pauser = rx.subjects.BehaviorSubject(True)
rx.Observable
         .combine_latest(stream1, stream2, lambda x, y: (x, y))
         .pausable(pauser)
         .subscribe(print)
# Begin updating simultaneously
pauser.on_next(False)
stream1.on_next(3)
stream2.on_next(4)
# Update done, resume combined stream
pauser.on_next(True)
# Prints:
# (1, 2)
# (3, 4)

要应用到我的GUI,我可以在我的模型中创建一个名为updatingBehaviorSubject,它发出整个模型是否正在更新。例如,如果stream1stream2同时更新,那么我可以将updating设置为True。对于任何制作成本高的可视化,我可以应用updating的值来暂停合并的流。

这在Rx中适用于c#:

var throttled = source.Publish(hot => hot.Buffer(() => hot.Throttle(dueTime));

这里的dueTime值是。net中的TimeSpan。它仅仅说明了在它产生一个值之前,您希望处于不活动状态的时间窗口是什么。这基本上吞噬了在一段时间内"同时"产生的值。

本例中的source就是你的.combine_latest(...) observable

相关内容

  • 没有找到相关文章

最新更新