使用Java进行大约20次HTTP调用并将数据传递到数据库



我有一个20项的集合,我将为这些项创建一个循环,并进行API调用以获取数据,根据返回的数据,我将不得不在数据库中更新。这个要求很简单,我可以用简单的Java来完成。

现在,对于性能,我正在学习使用RxJava。我浏览了互联网上的许多文章,发现人们引用async-http-client库进行异步http调用,我发现该库已经过时,维护人员正在计划移交给其他人,RxJava库中给出的库也像2014年开发的一样。由于我是RxJava的新手,你能帮我找到正确的方法吗。

我目前正在获取所有数据,并将其转换为以下的可观测数据

Observable<ENV> envs= Observable.fromIterable(allEnvs);

我还需要得到一些帮助,比如上面的代码很好吗?或者我应该像下面这样为可观察的结构创建,这是groovy中的片段,我必须用Java编写。

val createObserver = Observable.create(ObservableOnSubscribe<String> { emitter ->
emitter.onNext("Hello World")
emitter.onComplete()
})

请帮助我选择最佳方法

假设http调用由以下类表示:

public class HttpCall implements Callable<String> {
private final int i;
private HttpCall(int i) {
this.i = i;
}
@Override
public String call() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Something for : " + i;
}
}

它等待2秒,然后发出一个字符串(http调用结果(。

要组合不同http调用产生的所有项,我们可以使用merge运算符。但在此之前,我们需要使用fromCallable算子将Callable转换为Observable

void sequentially() {
List<Observable<String>> httpRequests = IntStream.range(0, 20)
.mapToObj(HttpCall::new)
.map(Observable::fromCallable)
.collect(Collectors.toList());
Observable.merge(httpRequests)
.timestamp(TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Elapsed time : " + e.time() + " -- " + e.value() + ". Executed on thread : " + Thread.currentThread().getName()));
}

因为所有请求都是在同一个线程上执行的,所以维持顺序:

运行时间:1602122218--用于:0。在线程上执行:main
运行时间:1602122220--用于:1的某些内容。在线程上执行:main
运行时间:1602122222--用于:2的某些内容。在线程上执行:main
。。。

正如你所看到的,这些项目相隔2秒。

要在每个请求自己的线程中运行每个请求,我们需要告诉Rx每个调用都需要一个线程。轻松,只需切换到建议的时间表之一。IO是我们所需要的(因为它是一个IO操作(。

void parallel( {
List<Observable<String>> httpRequests = IntStream.range(0, 20)
.mapToObj(HttpCall::new)
.map(httpCall -> Observable.fromCallable(httpCall)
.subscribeOn(Schedulers.io())
) // take a thread from the IO pool
.collect(Collectors.toList());
Observable.merge(httpRequests)
.timestamp(TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Elapsed time : " + e.time() + " -- " + e.value() + ". Executed on thread : " + Thread.currentThread().getName()));
}

这一次订单没有得到保证,它们几乎同时生产:

运行时间:1602123707--用于:2。在线程上执行:RxCachedThreadScheduler-3
运行时间:1602123707--0的某些内容。在线程上执行:RxCachedThreadScheduler-1
运行时间:1602123707--用于:1。在线程上执行:RxCachedThreadScheduler-1
。。。


代码可以缩短为:

Observable.range(0, 20)
.map(HttpCall::new)
.flatMap(httpCall -> Observable.fromCallable(httpCall).subscribeOn(Schedulers.io()))
.timestamp(TimeUnit.SECONDS)
.subscribe(e -> System.out.println("Elapsed time : " + e.time() + " -- " + e.value() + ". Executed on thread : " + Thread.currentThread().getName()));

merge在幕后使用flatMap

最新更新