我必须修改dropwizard应用程序以改善其运行时间。基本上,这个应用程序每天接收大约300万个url,下载并解析它们以检测恶意内容。问题是应用程序只能处理100万个url。当我查看应用程序时,我发现它正在进行许多顺序调用。我想知道如何通过异步或其他技术来改进应用程序。
所需代码如下:-
/* Scheduler */
private long triggerDetection(String startDate, String endDate) {
for (UrlRequest request : urlRequests) {
if (!validateRequests.isWhitelisted(request)) {
ContentDetectionClient.detectContent(request);
}
}
}
/* Client */
public void detectContent(UrlRequest urlRequest){
Client client = new Client();
URI uri = buildUrl(); /* It returns the URL of this dropwizard application's resource method provided below */
ClientResponse response = client.resource(uri)
.type(MediaType.APPLICATION_JSON_TYPE)
.post(ClientResponse.class, urlRequest);
Integer status = response.getStatus();
if (status >= 200 && status < 300) {
log.info("Completed request for url: {}", urlRequest.getUrl());
}else{
log.error("request failed for url: {}", urlRequest.getUrl());
}
}
private URI buildUrl() {
return UriBuilder
.fromPath(uriConfiguration.getUrl())
.build();
}
/* Resource Method */
@POST
@Path("/pageDetection")
@Consumes(MediaType.APPLICATION_JSON)
@Produces(MediaType.APPLICATION_JSON)
/**
* Receives the url of the publisher, crawls the content of that url, applies a detector to check if the content is malicious.
* @returns returns the probability of the page being malicious
* @throws throws exception if the crawl call failed
**/
public DetectionScore detectContent(UrlRequest urlRequest) throws Exception {
return contentAnalysisOrchestrator.detectContentPage(urlRequest);
}
/* Orchestrator */
public DetectionScore detectContentPage(UrlRequest urlRequest) {
try {
Pair<Integer, HtmlPage> response = crawler.rawLoad(urlRequest.getUrl());
String content = response.getValue().text();
DetectionScore detectionScore = detector.getProbability(urlRequest.getUrl(), content);
contentDetectionResultDao.insert(urlRequest.getAffiliateId(), urlRequest.getUrl(),detectionScore.getProbability()*1000,
detectionScore.getRecommendation(), urlRequest.getRequestsPerUrl(), -1, urlRequest.getCreatedAt() );
return detectionScore;
} catch (IOException e) {
log.info("Error while analyzing the url : {}", e);
throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
}
}
我正在考虑以下方法:-
不是通过POST调用dropwizard资源方法,而是直接从调度程序调用
orchestrator.detectContent(urlRequest)
。编排器可以返回detectionScore,我将把所有detectScores存储在一个映射/表中,并执行批处理数据库插入,而不是像现在的代码中那样单独插入。
我想对上述方法和其他可能改善运行时间的技术提出一些意见。另外,我刚刚读了Java异步编程,但似乎不能理解如何在上面的代码中使用它,所以也希望得到一些帮助。
谢谢。
编辑:我能想到两个瓶颈:
- 网页下载
- 将结果插入数据库(数据库位于另一个系统)
- 似乎处理是一次执行一个URL
系统有8gb内存,其中4gb似乎是空闲的
$ free -m
total used free shared buffers cached
Mem: 7843 4496 3346 0 193 2339
-/+ buffers/cache: 1964 5879
Swap: 1952 489 1463
CPU使用率也最小:
top - 13:31:19 up 19 days, 15:39, 3 users, load average: 0.00, 0.00, 0.00
Tasks: 215 total, 1 running, 214 sleeping, 0 stopped, 0 zombie
Cpu(s): 0.5%us, 0.0%sy, 0.0%ni, 99.4%id, 0.1%wa, 0.0%hi, 0.0%si, 0.0%st
Mem: 8031412k total, 4605196k used, 3426216k free, 198040k buffers
Swap: 1999868k total, 501020k used, 1498848k free, 2395344k cached
首先检查你最容易丢失的地方。
我想大部分时间都浪费在下载url上了。
如果下载url超过90%的时间,你可能无法改善你的应用程序,因为瓶颈不是java,而是你的网络。
仅当下载时间低于网络能力
时考虑以下内容如果下载时间不是很高,你可以试着提高你的性能。一个标准的方法是使用生产者消费者链。
基本上你可以这样划分你的工作:
Downloading --> Parsing --> Saving
下载是生产者,解析是下载过程的消费者,保存过程的生产者,保存是消费者。
每个步骤可以由不同数量的线程执行。例如,你可以有3个下载线程,5个解析线程和1个保存线程。
注释后编辑
假设瓶颈不是cpu时间,所以对java代码进行干预是不重要的。
如果你知道你每天下载多少gb,就可以看看它们是否接近你网络的最大带宽。
如果发生这种情况,有不同的可能性:
- 请求使用
Content-Encoding: gzip
压缩内容(以减少使用的带宽) - 在不同网络上工作的不同节点之间拆分应用程序(因此在不同网络之间拆分带宽)
- 更新你的带宽(因此增加你的网络带宽)
- 确保只下载请求的内容(所以没有javascript,图像,css等,如果没有请求)(以尽量减少带宽的使用)
- 前面解决方案的组合
受Davide(伟大的)答案的启发,这里有一个示例,使用simple-react(我编写的库)来并行化它的简单方法。注意,这里有一点不同,使用客户机来驱动服务器上的并发。
LazyReact streamBuilder = new LazyReact(15,15);
streamBuilder.fromIterable(urlRequests)
.filter(urlReq->!validateRequests.isWhitelisted(urlReq))
.forEach(request -> {
ContentDetectionClient.detectContent(request);
});
看起来你可以从客户端驱动并发。这意味着您可以在服务器端的线程之间分配工作,而不需要额外的工作。在这个例子中,我们有15个并发请求,但您可以将其设置为接近服务器可以处理的最大请求。您的应用程序是IO绑定的,因此您可以使用大量线程来提高性能。
simple-react作为一个期货流。因此,这里我们为每次调用ContentDetection客户端创建一个Async任务。我们有15个线程可用,因此一次可以对服务器进行15个调用。
Java 7JDK 8有一个支持Java 7的功能,叫做StreamSupport,你也可以通过RetroLambda来支持Lambda表达式。
要用CompletableFutures实现相同的解决方案,我们可以为每个符合条件的URL创建一个Future Task。UPDATE我认为我们不需要批量处理它们,我们可以使用Executor来限制活动期货的数量。我们只需要在最后把它们全部连接起来。
Executor exec = Executors.newFixedThreadPool(maxActive);//15 threads
List<CompletableFuture<Void>> futures= new ArrayList<>();
for (UrlRequest request : urlRequests) {
if (!validateRequests.isWhitelisted(request)) {
futures.add(CompletableFuture.runAsync(()->ContentDetectionClient.detectContent(request), exec));
}
}
CompletableFuture.allOf(futures.toArray())
.join();