在RxJava中等待多个异步调用完成



我的场景很简单,但是我似乎找不到它。

我有一组想要迭代的元素,对每个元素调用一个异步函数,然后等待它们全部完成(这再次以异步方式发生,在函数的逻辑中实现)。我对RxJava比较陌生,并且习惯于在NodeJS中通过向函数传递回调并在最后等待来轻松地完成此操作。下面是我需要的伪代码(元素的迭代器不需要是同步的也不需要是有序的):

for(line in lines){
 callAsyncFunction(line);
}
WAIT FOR ALL OF THEM TO FINISH

非常感谢你的帮助!

使用Rx:

Observable
.from(lines)
.flatMap(line -> callAsyncFunctionThatReturnsObservable(line).subscribeOn(Schedulers.io())
.ignoreElements();

现在,根据你想做的,你可以使用.switchIfEmpty(...)来订阅另一个可观察对象。

从技术上讲,如果你仔细想想,你需要做的是从你所有的元素中创建一个Observable,然后把它们压缩在一起,继续你的流的执行。

在伪代码中是这样的:

List<Observable<?>> observables = new ArrayList<>();
for(line in lines){
   observables.add(Observable.fromCallable(callAsyncFunction(line));
}
Observable.zip(observables, new Function<...>() { ... }); // kinda like Promise.all()

但是,Observable.from()可以将可迭代对象中的每个元素公开为对象流,从而消除了循环的需要,这可能并不奇怪。所以你可以创建一个新的Observable,在异步操作完成时调用onCompleted(),使用Observable.fromCallable()。之后,你可以等待这些新的可观察对象,将它们收集到一个列表中。

Observable.from(lines)
   .flatMap(new Func1<String, Observable<?>>() {
        @Override
        public Observable<?> call(String line) {
            return Observable.fromCallable(callAsyncFunction(line)); // returns Callable
        }
    }).toList()
      .map(new Func1<List<Object>, Object>() {
        @Override
        public Object call(List<Object> ignored) {
            // do something;
        }
    });

我的后半部分答案很大程度上是基于这个答案。

使用defer将"items to compute"的Iterable转换为observable的Iterable,然后压缩observable。

更困难的部分将是"等待他们全部完成"。正如您可能已经猜到的那样,响应式扩展是关于对内容的"反应",而不是"等待内容发生"。你可以订阅这个可观察对象,它会发出一个项目,然后在每个可观察对象只有一个项目时完成。订阅者可以在等待之后做任何你通常会做的事情;这允许您返回并让您的代码在不阻塞的情况下做它的事情。

相关内容

  • 没有找到相关文章

最新更新