我的场景很简单,但是我似乎找不到它。
我有一组想要迭代的元素,对每个元素调用一个异步函数,然后等待它们全部完成(这再次以异步方式发生,在函数的逻辑中实现)。我对RxJava比较陌生,并且习惯于在NodeJS中通过向函数传递回调并在最后等待来轻松地完成此操作。下面是我需要的伪代码(元素的迭代器不需要是同步的也不需要是有序的):
for(line in lines){
callAsyncFunction(line);
}
WAIT FOR ALL OF THEM TO FINISH
非常感谢你的帮助!
使用Rx:
Observable
.from(lines)
.flatMap(line -> callAsyncFunctionThatReturnsObservable(line).subscribeOn(Schedulers.io())
.ignoreElements();
现在,根据你想做的,你可以使用.switchIfEmpty(...)
来订阅另一个可观察对象。
从技术上讲,如果你仔细想想,你需要做的是从你所有的元素中创建一个Observable,然后把它们压缩在一起,继续你的流的执行。
在伪代码中是这样的:
List<Observable<?>> observables = new ArrayList<>();
for(line in lines){
observables.add(Observable.fromCallable(callAsyncFunction(line));
}
Observable.zip(observables, new Function<...>() { ... }); // kinda like Promise.all()
但是,Observable.from()
可以将可迭代对象中的每个元素公开为对象流,从而消除了循环的需要,这可能并不奇怪。所以你可以创建一个新的Observable,在异步操作完成时调用onCompleted()
,使用Observable.fromCallable()
。之后,你可以等待这些新的可观察对象,将它们收集到一个列表中。
Observable.from(lines)
.flatMap(new Func1<String, Observable<?>>() {
@Override
public Observable<?> call(String line) {
return Observable.fromCallable(callAsyncFunction(line)); // returns Callable
}
}).toList()
.map(new Func1<List<Object>, Object>() {
@Override
public Object call(List<Object> ignored) {
// do something;
}
});
我的后半部分答案很大程度上是基于这个答案。
使用defer将"items to compute"的Iterable转换为observable的Iterable,然后压缩observable。
更困难的部分将是"等待他们全部完成"。正如您可能已经猜到的那样,响应式扩展是关于对内容的"反应",而不是"等待内容发生"。你可以订阅这个可观察对象,它会发出一个项目,然后在每个可观察对象只有一个项目时完成。订阅者可以在等待之后做任何你通常会做的事情;这允许您返回并让您的代码在不阻塞的情况下做它的事情。