执行多次下载并等待所有下载完成



我目前正在开发一个API服务,该服务允许1个或多个用户从S3存储桶下载1个或更多项目并将内容返回给用户。虽然下载很好,但下载几个文件所需的时间相当于文件数量的100-150毫秒。

我已经尝试了几种方法来加快速度——parallelStream()而不是stream()(考虑到同时下载的数量,它有耗尽线程的严重风险),以及CompleteableFutures,甚至创建了一个ExecutorService,进行下载,然后关闭池。通常,我只想要几个并发任务,例如每个请求同时执行5个任务,以尝试减少活动线程的数量。

我尝试过集成Spring@Cacheable来将下载的文件存储到Redis(文件是只读的)-虽然这确实减少了响应时间(检索文件需要几毫秒,而检索文件需要100-150毫秒),但只有在之前检索过文件后,好处才会显现出来。

考虑到我不想(或不认为我可以)让数百个线程同时打开http连接并下载,处理等待多个异步任务完成然后获得结果的最佳方法是什么?

您应该考虑捆绑并行流中默认使用的公共fork/join池,因为我认为它用于Stream api之外的其他操作,如排序操作。您可以为stream创建自己的fork/join池,而不是用I/O绑定的并行流来饱和公共的fork/join池。请参阅此问题,了解如何创建一个具有所需大小的临时ForkJoinPool,并在其中运行并行流。

您还可以创建一个具有固定大小线程池的ExecutorService,该线程池也将独立于公共fork/join池,并将仅使用池中的线程来限制请求。它还允许您指定要专用的线程数:

ExecutorService executor = Executors.newFixedThreadPool(MAX_THREADS_FOR_DOWNLOADS);
try {
    List<CompletableFuture<Path>> downloadTasks = s3Paths
            .stream()
            .map(s3Path -> completableFuture.supplyAsync(() -> mys3Downloader.downloadAndGetPath(s3Path), executor))
            .collect(Collectors.toList());    
        // at this point, all requests are enqueued, and threads will be assigned as they become available      
        executor.shutdown();    // stops accepting requests, does not interrupt threads, 
                                // items in queue will still get threads when available
        // wait for all downloads to complete
        CompletableFuture.allOf(downloadTasks.toArray(new CompletableFuture[downloadTasks.size()])).join();
        // at this point, all downloads are finished, 
        // so it's safe to shut down executor completely
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    } finally {
        executor.shutdownNow(); // important to call this when you're done with the executor.
    }

在@Hank D的带领下,您可以封装executor服务的创建,以确保在使用所述executor:后确实调用ExecutorService::shutdownNow

private static <VALUE> VALUE execute(
  final int nThreads,
  final Function<ExecutorService, VALUE> function
) {
  ExecutorService executorService = Executors.newFixedThreadPool(nThreads);
  try {
    return function.apply(executorService);
  } catch (final InterruptedException | ExecutionException exception) {
    exception.printStackTrace();
  } finally {
    executorService .shutdownNow(); // important to call this when you're done with the executor service.
  }
}
public static void main(final String... arguments) {
  // define variables
  final List<CompletableFuture<Path>> downloadTasks = execute(
    MAX_THREADS_FOR_DOWNLOADS,
    executor -> s3Paths
      .stream()
      .map(s3Path -> completableFuture.supplyAsync(
        () -> mys3Downloader.downloadAndGetPath(s3Path),
        executor
      ))
      .collect(Collectors.toList())
  );
  // use downloadTasks
}

相关内容

  • 没有找到相关文章

最新更新