Java:使用异步编程优化应用程序



我必须修改dropwizard应用程序以改善其运行时间。基本上,这个应用程序每天接收大约300万个url,下载并解析它们以检测恶意内容。问题是应用程序只能处理100万个url。当我查看应用程序时,我发现它正在进行许多顺序调用。我想知道如何通过异步或其他技术来改进应用程序。

所需代码如下:-

/* Scheduler */
private long triggerDetection(String startDate, String endDate) {
for (UrlRequest request : urlRequests) {
                if (!validateRequests.isWhitelisted(request)) {
                    ContentDetectionClient.detectContent(request);
                }
            }
}
/* Client */
public void detectContent(UrlRequest urlRequest){
        Client client = new Client();
        URI uri = buildUrl(); /* It returns the URL of this dropwizard application's resource method provided below */
        ClientResponse response = client.resource(uri)
                .type(MediaType.APPLICATION_JSON_TYPE)
                .post(ClientResponse.class, urlRequest);
        Integer status = response.getStatus();
        if (status >= 200 && status < 300) {
            log.info("Completed request for url: {}", urlRequest.getUrl());
        }else{
            log.error("request failed for url: {}", urlRequest.getUrl());
        }
    }
    private URI buildUrl() {
        return UriBuilder
                .fromPath(uriConfiguration.getUrl())
                .build();
    }
/* Resource Method */
 @POST
    @Path("/pageDetection")
    @Consumes(MediaType.APPLICATION_JSON)
    @Produces(MediaType.APPLICATION_JSON)
    /**
     * Receives the url of the publisher, crawls the content of that url, applies a detector to check if the content is malicious.
     * @returns returns the probability of the page being malicious
     * @throws throws exception if the crawl call failed
     **/
    public DetectionScore detectContent(UrlRequest urlRequest) throws Exception {
        return contentAnalysisOrchestrator.detectContentPage(urlRequest);
    }
/* Orchestrator */
public DetectionScore detectContentPage(UrlRequest urlRequest) {
        try {
            Pair<Integer, HtmlPage> response =  crawler.rawLoad(urlRequest.getUrl());
            String content =   response.getValue().text();
            DetectionScore detectionScore = detector.getProbability(urlRequest.getUrl(), content);
            contentDetectionResultDao.insert(urlRequest.getAffiliateId(), urlRequest.getUrl(),detectionScore.getProbability()*1000,
                    detectionScore.getRecommendation(), urlRequest.getRequestsPerUrl(), -1, urlRequest.getCreatedAt() );
            return detectionScore;
        } catch (IOException e) {
            log.info("Error while analyzing the url : {}", e);
            throw new WebApplicationException(e, Response.Status.INTERNAL_SERVER_ERROR);
        }
    }

我正在考虑以下方法:-

  • 不是通过POST调用dropwizard资源方法,而是直接从调度程序调用orchestrator.detectContent(urlRequest)

  • 编排器可以返回detectionScore,我将把所有detectScores存储在一个映射/表中,并执行批处理数据库插入,而不是像现在的代码中那样单独插入。

我想对上述方法和其他可能改善运行时间的技术提出一些意见。另外,我刚刚读了Java异步编程,但似乎不能理解如何在上面的代码中使用它,所以也希望得到一些帮助。

谢谢。

编辑:我能想到两个瓶颈:

  • 网页下载
  • 将结果插入数据库(数据库位于另一个系统)
  • 似乎处理是一次执行一个URL

系统有8gb内存,其中4gb似乎是空闲的

$ free -m
             total       used       free     shared    buffers     cached
Mem:          7843       4496       3346          0        193       2339
-/+ buffers/cache:       1964       5879 
Swap:         1952        489       1463 

CPU使用率也最小:

top - 13:31:19 up 19 days, 15:39,  3 users,  load average: 0.00, 0.00, 0.00
Tasks: 215 total,   1 running, 214 sleeping,   0 stopped,   0 zombie
Cpu(s):  0.5%us,  0.0%sy,  0.0%ni, 99.4%id,  0.1%wa,  0.0%hi,  0.0%si,  0.0%st
Mem:   8031412k total,  4605196k used,  3426216k free,   198040k buffers
Swap:  1999868k total,   501020k used,  1498848k free,  2395344k cached

首先检查你最容易丢失的地方。

我想大部分时间都浪费在下载url上了。

如果下载url超过90%的时间,你可能无法改善你的应用程序,因为瓶颈不是java,而是你的网络。


仅当下载时间低于网络能力

时考虑以下内容

如果下载时间不是很高,你可以试着提高你的性能。一个标准的方法是使用生产者消费者链。

基本上你可以这样划分你的工作:

Downloading --> Parsing --> Saving 

下载是生产者,解析是下载过程的消费者,保存过程的生产者,保存是消费者。

每个步骤可以由不同数量的线程执行。例如,你可以有3个下载线程,5个解析线程和1个保存线程。


注释后编辑

假设瓶颈不是cpu时间,所以对java代码进行干预是不重要的。

如果你知道你每天下载多少gb,就可以看看它们是否接近你网络的最大带宽。

如果发生这种情况,有不同的可能性:

  • 请求使用Content-Encoding: gzip压缩内容(以减少使用的带宽)
  • 在不同网络上工作的不同节点之间拆分应用程序(因此在不同网络之间拆分带宽)
  • 更新你的带宽(因此增加你的网络带宽)
  • 确保只下载请求的内容(所以没有javascript,图像,css等,如果没有请求)(以尽量减少带宽的使用)
  • 前面解决方案的组合

受Davide(伟大的)答案的启发,这里有一个示例,使用simple-react(我编写的库)来并行化它的简单方法。注意,这里有一点不同,使用客户机来驱动服务器上的并发。

LazyReact streamBuilder = new LazyReact(15,15);
streamBuilder.fromIterable(urlRequests)
      .filter(urlReq->!validateRequests.isWhitelisted(urlReq))
      .forEach(request -> {
           ContentDetectionClient.detectContent(request);
       });

看起来你可以从客户端驱动并发。这意味着您可以在服务器端的线程之间分配工作,而不需要额外的工作。在这个例子中,我们有15个并发请求,但您可以将其设置为接近服务器可以处理的最大请求。您的应用程序是IO绑定的,因此您可以使用大量线程来提高性能。

simple-react作为一个期货流。因此,这里我们为每次调用ContentDetection客户端创建一个Async任务。我们有15个线程可用,因此一次可以对服务器进行15个调用。

Java 7

JDK 8有一个支持Java 7的功能,叫做StreamSupport,你也可以通过RetroLambda来支持Lambda表达式。

要用CompletableFutures实现相同的解决方案,我们可以为每个符合条件的URL创建一个Future Task。UPDATE我认为我们不需要批量处理它们,我们可以使用Executor来限制活动期货的数量。我们只需要在最后把它们全部连接起来。

   Executor exec = Executors.newFixedThreadPool(maxActive);//15 threads
   List<CompletableFuture<Void>> futures= new ArrayList<>();
   for (UrlRequest request : urlRequests) {
            if (!validateRequests.isWhitelisted(request)) {
                futures.add(CompletableFuture.runAsync(()->ContentDetectionClient.detectContent(request), exec));
            }
        }
 CompletableFuture.allOf(futures.toArray())
                      .join();

最新更新