我们可以通过什么方式使用 RxJava 同步两个异步调用?在下面的示例中,作为 API 调用的方法contentService.listContents
必须先完成,然后才能对每个架构执行processSchema
方法。
schemaService.listSchema()
.toObservable()
.flatMapIterable(schemas -> {
schemas.forEach(schema -> {
// async call
contentService.listContents(schema.getName()).subscribe(contents -> {
doSomethingWithThe(contents);
});
});
// contentService.listContents` must complete first before
// processSchema should be called for each schema
return schemas;
}).subscribe(schema -> { processSchema(schema); },
error -> { Console.error(error.getMessage()); });
processSchema
上面的代码的问题不会等待contentService.listContents
,因为它是异步的,而不是彼此不同步的。
您必须使用flatMap
来处理schemas
,并且由于它是一个列表,因此您必须展开它并再次flatMap
:
schemaService.listSchema()
.toObservable()
.flatMap(schemas ->
Observable.fromIterable(schemas)
.flatMap(schema ->
contentService.listContents(schema.getName())
.doOnNext(contents -> doSomethingWith(contents))
)
// probably you don't care about the inner contents
.ignoreElements()
// andThen will switch to this only when the sequence above completes
.andThen(Observable.just(schemas))
)
.subscribe(
schema -> processSchema(schema),
error -> Console.error(error.getMessage())
);
请注意,您尚未定义服务调用的返回类型,因此您可能必须使用flatMapSingle
和doOnSuccess
例如。
您可能正在寻找flatMap
。
从docs
延续
有时,当一个项目可用时,人们会 喜欢对它执行一些依赖的计算。这是有时 称为延续,取决于应该发生什么和什么 类型涉及,可能涉及各种操作人员来完成。
依靠
最典型的场景是给定一个值,调用 另一个服务,等待并继续其结果:
service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))
通常,后面的序列也需要值 来自早期的映射。这可以通过移动外部来实现 flatMap进入前一个flatMap的内部部分,例如:
service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)