我有ConnectableObservable
的List
,并且我想在前一项完成时运行列表中的一项。我试过在列表上应用concat()
方法,但显然这种方法不适用于ConnectableObservables。我该怎么做呢?
这是我尝试过的:
ConnectableObservable<Long> observable1 =
Observable.timer(1500, TimeUnit.MILLISECONDS).publish();
ConnectableObservable<Long> observable2 =
Observable.timer(1550, TimeUnit.MILLISECONDS).publish();
List<ConnectableObservable<Long>> list = new ArrayList<>();
list.add(observable1);
list.add(observable2);
Observable.concat(list).doOnNext(aLong -> {
Log.i("result", aLong.toString());
}).subscribe();
observable1.connect();
observable2.connect();
这里,observable2
在observable1
完成后运行50毫秒,而不是预期的1550毫秒。
您没有在ConnectableObservable
上调用connect()
方法,因此它还没有开始发布任何内容。
把
observable1.connect();
observable2.connect();
代码末尾的。或者,您可以将.autoConnect(1)
添加到observable1
和observable2
中。
它不会同时工作,因为下面的junit确认它:
@Test
void connectableTest() {
TestScheduler testScheduler = new TestScheduler();
ConnectableObservable<Integer> observable1 =
Observable
.just(1, 2, 3)
.zipWith(
Observable.interval(0, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
.publish();
ConnectableObservable<Integer> observable2 =
Observable
.just(4, 5, 6)
.zipWith(
Observable.interval(3, 1, TimeUnit.SECONDS, testScheduler), (integer, time) -> integer)
.publish();
List<ConnectableObservable<Integer>> list = new ArrayList<>();
list.add(observable1);
list.add(observable2);
TestObserver<Integer> output = Observable.concat(list).test();
observable1.connect();
observable2.connect();
testScheduler.advanceTimeBy(20, TimeUnit.SECONDS);
output.assertValues(1,2,3,4,5,6).assertComplete();
}