RxJava2如何在压缩两个数据源时完成一些操作



我正在处理两个mono-wav文件,我想将它们保存为立体声文件。每个单声道都按如下方式进行流式传输和订阅。单声道数据可以具有不同的长度。我使用zip操作符来处理对。问题是我必须打电话给

fileWriter.stopRecord();

当处理完这两个数据时。所以我想在zip onCompleted方法中完成它,但它不起作用,因为处理完一个mono(调用onCompleted(后,它会调用zip onCompleted,但另一个mono-仍在处理中。那么,我如何确保在处理完两个mono之后调用zip onCompleted呢?对中的一个值可能为空并不重要。

Observable<HermesMessages.AudioChunk> speakerAudioObservable = this.channel.getSpeakerAudio();
speakerAudioObservable.subscribe(
sample -> {
log.trace("Processing speaker !!!" );
},
error -> {
log.error("Error during writing to file", error);
}, () -> {
log.trace("Completed writing speaker audio to file.");
});
Observable<HermesMessages.AudioChunk> dronAudioObservable = channel.getDronAudio();
dronAudioObservable.subscribe(
sample -> {
log.trace("Processing dron !!!" );
},
error -> {
log.error("Error during writing to file", error);
},
() -> {
log.trace("Completed writing dron audio to file.");
});
Observable.zip(speakerAudioObservable, dronAudioObservable, Pair::create)
.subscribe(
pair -> {
fileWriter.streamSecData(pair.first().getContent());
fileWriter.workerJob();
fileWriter.streamData(pair.second().getContent());
fileWriter.workerJob();
},
error -> {
log.error("zip Error during writing to file", error);
fileWriter.stopRecord();
}, () -> {
log.trace("zip Completed writing speaker audio to file.");
fileWriter.stopRecord();
});

使用zip不能做到这一点,因为当其中一个可观察zipped完成发射项目时,流就完成了。(来源:http://reactivex.io/documentation/operators/zip.html)。

但您可以使用另一个名为CombineLatest的运算符。它将一个可观测对象发射的一个项目和另一个可观察对象发射的最后一个项目组合在一起。不利的一面是,你必须检查一个项目是否已经"处理"(已经从这个项目创建了一对(。因此,您必须更改Pair::create方法。也不需要单独订阅每个可观察到的内容。

来源:http://reactivex.io/documentation/operators/combinelatest.html

我用一些Integer做了一个简单的解决方案测试希望这个解决方案能帮助你2个具有不同大小的整数流

public void run(String... args) throws Exception {
Observable<Integer> integerObservable1 = Observable.just(1, 2, 3,4);
Observable<Integer> integerObservable2 = Observable.just(1, 2);
runObservable(integerObservable1, integerObservable2)
.map(o -> {
System.out.println(o);
return o;
}).subscribe();
}
private Observable<Object> runObservable(Observable<Integer> integerObservable1, Observable<Integer> integerObservable2) {
return Observable.merge(
Observable.zip(integerObservable1, integerObservable2, (BiFunction<Integer, Integer, Object>) (integer, integer2) -> integer + " " + integer2),
Single.zip(integerObservable1.toList(), integerObservable2.toList(), (integers, integers2) -> Math.min(integers.size(), integers2.size()))
.flatMapObservable(integer -> Observable.merge(integerObservable1.skip(integer), integerObservable2.skip(integer)))
.map(integer -> integer + " " + "static value")
);
}

代码输出:

1 1

2 2

3静态值

4静态值

由于流1有2个元素,流2有4个元素,所以我将其保留为静态,因为rxjava不支持null,但您可以选择静态值是null还是最后一个值

最新更新