使用 RxJava2 从一系列异步 HTTP 请求创建具有背压的可流动请求



我想创建一个Flowable来流式传输从按块发送结果的同步 REST API 端点接收的结果(有必要发送另一个请求以使用上一个响应的标头获取剩余的结果列表(。它不想使用背压策略,而是仅在必要时(在 Flowable 中请求时(等待发送后续 HTTP 请求。

函数Flowable.generate非常接近我想要的,但它似乎仅适用于同步调用。是否有这样的异步调用解决方案,或者您有任何建议吗?

谢谢

我不确定我是否正确理解了你,但我想你想一个接一个地执行一个请求,你需要来自上一个请求的一些数据来执行下一个请求。

假设我们有以下Flowable对象:

Flowable<String> first = Flowable.fromCallable(() -> {
Thread.sleep(2000);
return "I need to be executed first";
});
Flowable<String> second = Flowable.fromCallable(() -> "I need to be executed later");

首先Flowable会更慢,但我们想在first之后执行second

我们可以使用以下运算符concat(...)执行此操作:

Flowable.concat(first, second).subscribe(System.out::println);

根据 RxJava 文档,concat 运算符从两个或多个可观察量发出排放而不交错它们,因此请求将以正确的顺序执行。

您可能还希望根据第一个请求的条件或在第二个Flowable中使用第一个Flowable中的一些数据来执行第二个请求。

您可以使用flatMap(...)运算符实现这一点,如下所示:

first.flatMap(
s -> s.equals("I need to be executed first")
? second
: Flowable.empty())
.subscribe(System.out::println);

在此示例中,仅当第一个Flowable完全符合我们的意愿时,才会发出第二个。如果不是,将发出空Flowable。如果要在第二个运算符中使用第一个Flowable中的数据,可以在flatMap(...)运算符中创建新的自定义Flowable,而不是像这样返回secondFlowable

first.flatMap(
s -> s.equals("I need to be executed first")
? Flowable.fromCallable(() -> "I'm using data from the first Flowable: ".concat(s))
: Flowable.empty())
.subscribe(System.out::println);

我希望,这个答案会对你有所帮助。

最新更新