使用 RxJava 同步两个异步 API 调用



我们可以通过什么方式使用 RxJava 同步两个异步调用?在下面的示例中,作为 API 调用的方法contentService.listContents必须先完成,然后才能对每个架构执行processSchema方法。

schemaService.listSchema()
.toObservable()
.flatMapIterable(schemas -> {
schemas.forEach(schema -> {
// async call
contentService.listContents(schema.getName()).subscribe(contents -> {
doSomethingWithThe(contents); 
});
});
// contentService.listContents` must complete first before 
// processSchema should be called for each schema
return schemas;
}).subscribe(schema -> { processSchema(schema); }, 
error -> { Console.error(error.getMessage()); });

processSchema上面的代码的问题不会等待contentService.listContents,因为它是异步的,而不是彼此不同步的。

您必须使用flatMap来处理schemas,并且由于它是一个列表,因此您必须展开它并再次flatMap

schemaService.listSchema()
.toObservable()
.flatMap(schemas -> 
Observable.fromIterable(schemas)
.flatMap(schema -> 
contentService.listContents(schema.getName())
.doOnNext(contents -> doSomethingWith(contents))
)
// probably you don't care about the inner contents
.ignoreElements()
// andThen will switch to this only when the sequence above completes
.andThen(Observable.just(schemas))
)
.subscribe(
schema -> processSchema(schema), 
error -> Console.error(error.getMessage())
);

请注意,您尚未定义服务调用的返回类型,因此您可能必须使用flatMapSingledoOnSuccess例如。

您可能正在寻找flatMap

docs

延续

有时,当一个项目可用时,人们会 喜欢对它执行一些依赖的计算。这是有时 称为延续,取决于应该发生什么和什么 类型涉及,可能涉及各种操作人员来完成。

依靠

最典型的场景是给定一个值,调用 另一个服务,等待并继续其结果:

service.apiCall()
.flatMap(value -> service.anotherApiCall(value))
.flatMap(next -> service.finalCall(next))

通常,后面的序列也需要值 来自早期的映射。这可以通过移动外部来实现 flatMap进入前一个flatMap的内部部分,例如:

service.apiCall()
.flatMap(value ->
service.anotherApiCall(value)
.flatMap(next -> service.finalCallBoth(value, next))
)

最新更新