在何处创建执行器服务以及何时关闭它们



我正在使用Spring和Jersey创建一个REST服务,我有一个用例,对于我收到的每个请求,我需要对上游API进行几次调用(N)。

我收到一个请求,它有 n 个项目,对于每个项目,我创建一个线程来调用我的依赖项 (REST) 并处理响应。最后,我将所有响应收集在一起,维护顺序,并将它们作为单个响应返回给客户。

我正在使用Java 8的CompletableFuture,并想知道我是否正确使用了ExecutorService框架。

@Component // automatic singleton by Spring
class A {
private ExecutorService executorService = Executors.newCachedThreadPool();
private RawResponse getRawResponse(Item item) {
// make REST call
}

private Response processResponse(RawResponse rawResponse) {
// process response
}
public List<Response> handleRequest(Request request) {
List<CompletableFuture> futureResponses = new ArrayList<>();
for(Item item : request.getItems()) {
CompletableFuture<Response> futureResponse = CompletableFuture.supplyAsync(() -> getRawResponse(item), executorService)
.thenApply(rawResponse -> processResponse(rawResponse))
.handle((response, throwable) {
if(throwable != null) { // log and return default response
} else { return response;}});
futureResponses.add(futureResponse);
}
List<Response> result = new ArrayList<>();
for (CompletableFuture<Resource> futureResponse : futureResponses) {
try {
result.add(futureResponse.get());
} catch (Exception e) {
// log error
}
}
return result;
}
}

我现在的问题是,我是否应该将执行器服务的创建移到上面:

List<CompletableFuture> futureResponses = new ArrayList<>();

并在上面调用关闭:

return result;

因为此时,我并没有真正在任何地方调用关闭,因为该应用程序将始终在其 docker 容器中运行。

继续创建和丢弃池的成本是否高昂,还是当前的方式会泄漏内存?而且我认为将池静态称为私有字段变量是多余的,因为该类无论如何都是春豆(单例)。

任何建议将不胜感激,我也应该使用cachedThreadPool吗?我不确定如何估算我需要的线程数。

我应该将执行器服务的创建移到上面吗?

不,您没有,您的ExecutorService在示例代码中的正确位置。把它想象成一个线程池,你不会想初始化一个新的线程池,并为handleRequest的每个方法调用关闭它。当然,ExecutorService比线程池做更多的工作,实际上它将管理下面的线程池,并为异步任务提供生命周期管理。

我并没有真正在任何地方调用关闭,因为该应用程序将始终在其 docker 容器中运行。

在大多数情况下,您在应用程序启动时启动ExecutorService,并在应用程序关闭时关闭它。因此,您可以将其保留在那里,因为它将在应用程序退出时关闭,或者如果您需要执行优雅的关闭,则可以添加某种shutdown hooks

继续创建和丢弃池的成本是否很高。

有点,我们不想经常创建和丢弃Thread,所以我们有线程池,如果你为每个方法调用创建/丢弃池,那么拥有一个线程池有什么意义。

还是当前的方式会泄漏内存?

不可以,只要您提交的任务不泄漏内存。ExecutorService本身的实现很好用。

而且我认为将池静态称为私有字段变量是多余的,因为该类无论如何都是春豆(单例)

是的,你是对的。您还可以将ExecutorService定义为Spring Bean并将其注入服务Bean,如果要执行一些自定义的初始化过程。

如果我使用 cachedThreadPool,我不确定如何估算我需要的线程数。

这很难说,您需要做一些测试才能为您的应用程序获得正确数量的线程。但是大多数NIO或EventDriven framework的默认可用内核数是线程数的两倍。

当你使用Spring时,你可能希望让它处理异步执行。

只需将@EnableAsync放入@Configuration类之一中,即可在方法上启用@Async注释。

然后,您可以将getRawResponse()更改为

@Async
private CompletableFuture<RawResponse> getRawResponse(Item item) {
// make REST call
return CompletableFuture.completedFuture(rawResponse);
}

(您可能需要将此方法放在单独的服务中以允许正确的代理,具体取决于 AOP 在项目中的配置方式)

并将循环更改为简单

for(Item item : request.getItems()) {
CompletableFuture<Response> futureResponse = getRawResponse(item)
.thenApply(rawResponse -> processResponse(rawResponse))
.handle((response, throwable) {
if(throwable != null) { // log and return default response
} else { return response;}
});
futureResponses.add(futureResponse);
}

如您所见,您不再需要关心服务中的执行程序。

您还可以通过声明其 Spring bean 来自定义执行器,例如:

@SpringBootApplication
@EnableAsync
public class Application extends AsyncConfigurerSupport {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("GithubLookup-");
executor.initialize();
return executor;
}
}

您甚至可以配置多个执行程序,并通过将其名称作为参数提供给@Async注释来选择一个。

另请参阅入门:创建异步方法和@Async注释。

最新更新