假设我有几个Observable
供应商对get
电话有副作用:
Subject<String> firstSubject = PublishSubject.create();
Supplier<Observable<String>> firstSupplier = () -> {
System.out.println("side effect of first one");
return firstSubject;
};
Subject<String> secondSubject = PublishSubject.create();
Supplier<Observable<String>> secondSupplier = () -> {
System.out.println("side effect of second one");
return secondSubject;
};
Subject<String> thirdSubject = PublishSubject.create();
Supplier<Observable<String>> thirdSupplier = () -> {
System.out.println("side effect of third one");
return thirdSubject;
};
现在我想通过以下方式组合它们 - 在前一个供应商的Observable
发出值(调用onNext
(后调用下一个供应商的get
。 我可以使用以下代码来做到这一点:
firstSupplier.get()
.flatMap(__ -> secondSupplier.get())
.flatMap(__ -> thirdSupplier.get())
.subscribe();
// output: side effect of first one
firstSubject.onNext("");
// output: side effect of second one
secondSubject.onNext("");
// output: side effect of third one
如何重写此代码以接受未知数量的供应商,例如作为Collection<Supplier<Observable>>
传递?
我已经回顾了各种工厂Observable
方法(如merge
,concat
(,但是他们都在收集ObservableSource
,这意味着我必须急切地打电话给我所有的供应商get
。但是,就我而言,懒惰地调用它很重要 - 只有在之前的Observable
发出值之后。
编辑 2:
我完全忘记了即使maxConcurrency
设置为 1,Observable.flatMap
也会调用映射器,并将生成的Observable
排队等待稍后运行。
我希望以下设置按预期工作(即,没有指定主题的后续onNext
应该发生什么(。
Subject<String> firstSubject = PublishSubject.create();
Supplier<Observable<String>> firstSupplier = () -> {
System.out.println("side effect of first one");
return firstSubject;
};
Subject<String> secondSubject = PublishSubject.create();
Supplier<Observable<String>> secondSupplier = () -> {
System.out.println("side effect of second one");
return secondSubject;
};
Subject<String> thirdSubject = PublishSubject.create();
Supplier<Observable<String>> thirdSupplier = () -> {
System.out.println("side effect of third one");
return thirdSubject;
};
Collection<Supplier<Observable<String>>> collection =
Arrays.asList(firstSupplier, secondSupplier, thirdSupplier);
Observable.fromIterable(collection)
.concatMap(supplier -> supplier.get().take(1))
.subscribe();
System.out.println("// output: side effect of first one");
firstSubject.onNext("");
System.out.println("// output: side effect of second one");
secondSubject.onNext("");
System.out.println("// output: side effect of third one");
指纹:
side effect of first one
// output: side effect of first one
side effect of second one
// output: side effect of second one
side effect of third one
// output: side effect of third one