Concat ConnectableObservables



我有ConnectableObservableList,并且我想在前一项完成时运行列表中的一项。我试过在列表上应用concat()方法,但显然这种方法不适用于ConnectableObservables。我该怎么做呢?

这是我尝试过的:

ConnectableObservable<Long> observable1 =
Observable.timer(1500, TimeUnit.MILLISECONDS).publish();
ConnectableObservable<Long> observable2 =
Observable.timer(1550, TimeUnit.MILLISECONDS).publish();

List<ConnectableObservable<Long>> list = new ArrayList<>();
list.add(observable1);
list.add(observable2);
Observable.concat(list).doOnNext(aLong -> {
Log.i("result", aLong.toString());
}).subscribe();
observable1.connect();
observable2.connect();

这里,observable2observable1完成后运行50毫秒,而不是预期的1550毫秒。

您没有在ConnectableObservable上调用connect()方法,因此它还没有开始发布任何内容。

observable1.connect();
observable2.connect();

代码末尾的。或者,您可以将.autoConnect(1)添加到observable1observable2中。

它不会同时工作,因为下面的junit确认它:

@Test
void connectableTest() {
TestScheduler testScheduler = new TestScheduler();
ConnectableObservable<Integer> observable1 =
Observable
.just(1, 2, 3)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
.publish();
ConnectableObservable<Integer> observable2 =
Observable
.just(4, 5, 6)
.zipWith(
Observable.interval(3, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
.publish();
List<ConnectableObservable<Integer>> list = new ArrayList<>();
list.add(observable1);
list.add(observable2);
TestObserver<Integer> output = Observable.concat(list).test();
observable1.connect();
observable2.connect();
testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
output.assertValues(1,2,3,4,5,6).assertComplete();
}

相关内容

  • 没有找到相关文章

最新更新