我正在使用Spring和Jersey创建一个REST服务,我有一个用例,对于我收到的每个请求,我需要对上游API进行几次调用(N)。
我收到一个请求,它有 n 个项目,对于每个项目,我创建一个线程来调用我的依赖项 (REST) 并处理响应。最后,我将所有响应收集在一起,维护顺序,并将它们作为单个响应返回给客户。
我正在使用Java 8的CompletableFuture
,并想知道我是否正确使用了ExecutorService
框架。
@Component // automatic singleton by Spring
class A {
private ExecutorService executorService = Executors.newCachedThreadPool();
private RawResponse getRawResponse(Item item) {
// make REST call
}
private Response processResponse(RawResponse rawResponse) {
// process response
}
public List<Response> handleRequest(Request request) {
List<CompletableFuture> futureResponses = new ArrayList<>();
for(Item item : request.getItems()) {
CompletableFuture<Response> futureResponse = CompletableFuture.supplyAsync(() -> getRawResponse(item), executorService)
.thenApply(rawResponse -> processResponse(rawResponse))
.handle((response, throwable) {
if(throwable != null) { // log and return default response
} else { return response;}});
futureResponses.add(futureResponse);
}
List<Response> result = new ArrayList<>();
for (CompletableFuture<Resource> futureResponse : futureResponses) {
try {
result.add(futureResponse.get());
} catch (Exception e) {
// log error
}
}
return result;
}
}
我现在的问题是,我是否应该将执行器服务的创建移到上面:
List<CompletableFuture> futureResponses = new ArrayList<>();
并在上面调用关闭:
return result;
因为此时,我并没有真正在任何地方调用关闭,因为该应用程序将始终在其 docker 容器中运行。
继续创建和丢弃池的成本是否高昂,还是当前的方式会泄漏内存?而且我认为将池静态称为私有字段变量是多余的,因为该类无论如何都是春豆(单例)。
任何建议将不胜感激,我也应该使用cachedThreadPool吗?我不确定如何估算我需要的线程数。
我应该将执行器服务的创建移到上面吗?
不,您没有,您的ExecutorService
在示例代码中的正确位置。把它想象成一个线程池,你不会想初始化一个新的线程池,并为handleRequest
的每个方法调用关闭它。当然,ExecutorService
比线程池做更多的工作,实际上它将管理下面的线程池,并为异步任务提供生命周期管理。
我并没有真正在任何地方调用关闭,因为该应用程序将始终在其 docker 容器中运行。
在大多数情况下,您在应用程序启动时启动ExecutorService
,并在应用程序关闭时关闭它。因此,您可以将其保留在那里,因为它将在应用程序退出时关闭,或者如果您需要执行优雅的关闭,则可以添加某种shutdown hooks
。
继续创建和丢弃池的成本是否很高。
有点,我们不想经常创建和丢弃Thread
,所以我们有线程池,如果你为每个方法调用创建/丢弃池,那么拥有一个线程池有什么意义。
还是当前的方式会泄漏内存?
不可以,只要您提交的任务不泄漏内存。ExecutorService
本身的实现很好用。
而且我认为将池静态称为私有字段变量是多余的,因为该类无论如何都是春豆(单例)
是的,你是对的。您还可以将ExecutorService
定义为Spring Bean并将其注入服务Bean,如果要执行一些自定义的初始化过程。
如果我使用 cachedThreadPool,我不确定如何估算我需要的线程数。
这很难说,您需要做一些测试才能为您的应用程序获得正确数量的线程。但是大多数NIO或EventDriven framework的默认可用内核数是线程数的两倍。
当你使用Spring时,你可能希望让它处理异步执行。
只需将@EnableAsync
放入@Configuration
类之一中,即可在方法上启用@Async
注释。
然后,您可以将getRawResponse()
更改为
@Async
private CompletableFuture<RawResponse> getRawResponse(Item item) {
// make REST call
return CompletableFuture.completedFuture(rawResponse);
}
(您可能需要将此方法放在单独的服务中以允许正确的代理,具体取决于 AOP 在项目中的配置方式)
并将循环更改为简单
for(Item item : request.getItems()) {
CompletableFuture<Response> futureResponse = getRawResponse(item)
.thenApply(rawResponse -> processResponse(rawResponse))
.handle((response, throwable) {
if(throwable != null) { // log and return default response
} else { return response;}
});
futureResponses.add(futureResponse);
}
如您所见,您不再需要关心服务中的执行程序。
您还可以通过声明其 Spring bean 来自定义执行器,例如:
@SpringBootApplication
@EnableAsync
public class Application extends AsyncConfigurerSupport {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(2);
executor.setMaxPoolSize(2);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("GithubLookup-");
executor.initialize();
return executor;
}
}
您甚至可以配置多个执行程序,并通过将其名称作为参数提供给@Async
注释来选择一个。
另请参阅入门:创建异步方法和@Async注释。