RxJava-Combine最新但只为一个可观察的发射而火?



假设我有两个无限的Observables,它们可以在任何时候发出值。它们组合在一起形成一个Observable<ProcessFileEvent>

Observable<Integer>  selectedFileId= ...
Observable<MouseClick> buttonClick = ...
Observable<ProcessFileEvent> `processFileEvent` = Observable.combineLatest(selectedFileId, buttonClick, (s,b) -> {
    //create ProcessFileEvent here
});

问题是我只希望processFileEvent在buttonClick发出某些东西时发出,而不是selectedFileId。当输入文件ID并启动ProcessFileEvent时,这绝对不是用户期望的行为。我如何结合,但只排放时,buttonClick排放?

使用withLatestFrom:

Observable<Integer>  selectedFileId= ...
Observable<MouseClick> buttonClick = ...
Observable<ProcessFileEvent> processFileEvent = buttonClick.withLatestFrom(selectedFieldId, (b,s) -> {
    //create ProcessFileEvent here
});

它只在第一个Observable buttonClick发出时发出with

MouseClick对象使用.distinctUntilChanged()。这样,您将只在MouseClick更改时获得事件。

创建一个包含fileIdmouseClick的类:

static class FileMouseClick {
    final int fileId;
    final MouseClick mouseClick;
    FileMouseClick(int fileId, MouseClick mouseClick) {
        this.fileId = fileId;
        this.mouseClick = mouseClick;
    }
}
然后

Observable.combineLatest(selectedFileId, buttonClick, 
                         (s,b) -> new FileMouseClick(s,b))
    .distinctUntilChanged(f -> f.mouseClick)
    .map(toProcessFileEvent())

你可以使用Observable。一起来做这件事。请特别注意这一段:

然而,我能做些什么来确保这些窗口没有重叠——这样,一旦产生第二个值,我就不再看到第一个值了吗?如果我们返回左边的序列leftDurationSelector,那就可以了。等等,当我们从leftDurationSelector返回左序列,它会尝试创建另一个订阅,这可能会带来副作用。对此的快速回答是Publish和RefCount左序列。如果我们这样做,结果看起来更像这样。

left  |-0-1-2-3-4-5|
right |---A---B---C|
result|---1---3---5
          A   B   C

这个大理石图就是您想要的,其中selectedFileId是左边的序列,buttonClick是右边的序列。

这对我来说是有效的,缺失的部分是startWith()操作符。一旦你把它添加到observable中,它就会开始工作。

示例

         Observable.combineLatest(
                    firstObservable.observeDoors("1").startWith(emptyList<Door>()),
                    secondObservable.observeUnits("2").startWith( emptyList<Door>())
                ) { doors, units ->
                    ArrayList<Door>().apply {
                        addAll(units)
                        addAll(doors)
                    }
                }.map { 
                    //do something 
                }

这是因为combine需要所有的源开始发出至少一个值,它将初始化流,然后只听它的变化

相关内容

  • 没有找到相关文章

最新更新