任意数量的 Observable::flatMap 调用的链



假设我有几个Observable供应商对get电话有副作用:

Subject<String> firstSubject = PublishSubject.create();
Supplier<Observable<String>> firstSupplier = () -> {
System.out.println("side effect of first one");
return firstSubject;
};
Subject<String> secondSubject = PublishSubject.create();
Supplier<Observable<String>> secondSupplier = () -> {
System.out.println("side effect of second one");
return secondSubject;
};
Subject<String> thirdSubject = PublishSubject.create();
Supplier<Observable<String>> thirdSupplier = () -> {
System.out.println("side effect of third one");
return thirdSubject;
};

现在我想通过以下方式组合它们 - 在前一个供应商的Observable发出值(调用onNext(后调用下一个供应商的get。 我可以使用以下代码来做到这一点:

firstSupplier.get()
.flatMap(__ -> secondSupplier.get())
.flatMap(__ -> thirdSupplier.get())
.subscribe();
// output: side effect of first one
firstSubject.onNext("");
// output: side effect of second one
secondSubject.onNext("");
// output: side effect of third one

如何重写此代码以接受未知数量的供应商,例如作为Collection<Supplier<Observable>>传递?

我已经回顾了各种工厂Observable方法(如mergeconcat(,但是他们都在收集ObservableSource,这意味着我必须急切地打电话给我所有的供应商get。但是,就我而言,懒惰地调用它很重要 - 只有在之前的Observable发出值之后。

编辑 2:

我完全忘记了即使maxConcurrency设置为 1,Observable.flatMap也会调用映射器,并将生成的Observable排队等待稍后运行。

我希望以下设置按预期工作(即,没有指定主题的后续onNext应该发生什么(。

Subject<String> firstSubject = PublishSubject.create();
Supplier<Observable<String>> firstSupplier = () -> {
System.out.println("side effect of first one");
return firstSubject;
};
Subject<String> secondSubject = PublishSubject.create();
Supplier<Observable<String>> secondSupplier = () -> {
System.out.println("side effect of second one");
return secondSubject;
};
Subject<String> thirdSubject = PublishSubject.create();
Supplier<Observable<String>> thirdSupplier = () -> {
System.out.println("side effect of third one");
return thirdSubject;
};
Collection<Supplier<Observable<String>>> collection =
Arrays.asList(firstSupplier, secondSupplier, thirdSupplier);
Observable.fromIterable(collection)
.concatMap(supplier -> supplier.get().take(1))
.subscribe();
System.out.println("// output: side effect of first one");
firstSubject.onNext("");
System.out.println("// output: side effect of second one");
secondSubject.onNext("");
System.out.println("// output: side effect of third one");

指纹:

side effect of first one
// output: side effect of first one
side effect of second one
// output: side effect of second one
side effect of third one
// output: side effect of third one

最新更新