Boot Pros,
我最近开始在春季启动程序,我偶然发现了一个问题,我想在哪里得到你的意见。
我努力实现的目标:
- 我创建了一个控制器,它公开了一个名为
nonBlockingEndpoint
的GET
端点。这个nonBlockingEndpoint
执行一个相当长的操作,该操作占用大量资源,可以运行20到40秒。(在所附代码中,它被Thread.sleep()
模拟) - 每当调用
nonBlockingEndpoint
时,spring应用程序都应该注册该调用,并立即向调用者返回操作ID - 然后,调用者可以使用此ID在另一个端点
queryOpStatus
上查询此操作的状态。在开始时,它将被启动,一旦控制器完成了对重用的服务,它将是一个代码,如SERVICE_OK
。然后,调用者知道他的请求已在服务器上成功完成
我找到的解决方案:
- 我有以下控制器(请注意,它明确地而不是标记为@Async)
- 它使用
APIOperationsManager
来注册新操作已启动 - 我使用
CompletableFuture
java构造,通过使用CompletableFuture.supplyAsync(() -> {}
将长时间运行的代码作为一个新的异步进程提供 - 我立即向调用方返回一个响应,告知操作正在进行中
- 异步任务完成后,我使用
cf.thenRun()
通过API操作管理器更新操作状态
这是代码:
@GetMapping(path="/nonBlockingEndpoint")
public @ResponseBody ResponseOperation nonBlocking() {
// Register a new operation
APIOperationsManager apiOpsManager = APIOperationsManager.getInstance();
final int operationID = apiOpsManager.registerNewOperation(Constants.OpStatus.PROCESSING);
ResponseOperation response = new ResponseOperation();
response.setMessage("Triggered non-blocking call, use the operation id to check status");
response.setOperationID(operationID);
response.setOpRes(Constants.OpStatus.PROCESSING);
CompletableFuture<Boolean> cf = CompletableFuture.supplyAsync(() -> {
try {
// Here we will
Thread.sleep(10000L);
} catch (InterruptedException e) {}
// whatever the return value was
return true;
});
cf.thenRun(() ->{
// We are done with the super long process, so update our Operations Manager
APIOperationsManager a = APIOperationsManager.getInstance();
boolean asyncSuccess = false;
try {asyncSuccess = cf.get();}
catch (Exception e) {}
if(true == asyncSuccess) {
a.updateOperationStatus(operationID, Constants.OpStatus.OK);
a.updateOperationMessage(operationID, "success: The long running process has finished and this is your result: SOME RESULT" );
}
else {
a.updateOperationStatus(operationID, Constants.OpStatus.INTERNAL_ERROR);
a.updateOperationMessage(operationID, "error: The long running process has failed.");
}
});
return response;
}
这也是APIOperationsManager.java的完整性:
public class APIOperationsManager {
private static APIOperationsManager instance = null;
private Vector<Operation> operations;
private int currentOperationId;
private static final Logger log = LoggerFactory.getLogger(Application.class);
protected APIOperationsManager() {}
public static APIOperationsManager getInstance() {
if(instance == null) {
synchronized(APIOperationsManager.class) {
if(instance == null) {
instance = new APIOperationsManager();
instance.operations = new Vector<Operation>();
instance.currentOperationId = 1;
}
}
}
return instance;
}
public synchronized int registerNewOperation(OpStatus status) {
cleanOperationsList();
currentOperationId = currentOperationId + 1;
Operation newOperation = new Operation(currentOperationId, status);
operations.add(newOperation);
log.info("Registered new Operation to watch: " + newOperation.toString());
return newOperation.getId();
}
public synchronized Operation getOperation(int id) {
for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
Operation op = iterator.next();
if(op.getId() == id) {
return op;
}
}
Operation notFound = new Operation(-1, OpStatus.INTERNAL_ERROR);
notFound.setCrated(null);
return notFound;
}
public synchronized void updateOperationStatus (int id, OpStatus newStatus) {
iteration : for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
Operation op = iterator.next();
if(op.getId() == id) {
op.setStatus(newStatus);
log.info("Updated Operation status: " + op.toString());
break iteration;
}
}
}
public synchronized void updateOperationMessage (int id, String message) {
iteration : for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
Operation op = iterator.next();
if(op.getId() == id) {
op.setMessage(message);
log.info("Updated Operation status: " + op.toString());
break iteration;
}
}
}
private synchronized void cleanOperationsList() {
Date now = new Date();
for(Iterator<Operation> iterator = operations.iterator(); iterator.hasNext();) {
Operation op = iterator.next();
if((now.getTime() - op.getCrated().getTime()) >= Constants.MIN_HOLD_DURATION_OPERATIONS ) {
log.info("Removed operation from watchlist: " + op.toString());
iterator.remove();
}
}
}
}
我的问题
- 这个概念有效吗?可以改进什么
- 我会遇到并发问题/竞争条件吗
- 有没有更好的方法可以在靴子弹簧中实现同样的效果,但我只是还没有找到?(可能使用@Async指令?)
我很乐意收到您的反馈。
非常感谢,Peter P
提交一个长时间运行的任务并返回一个允许客户端稍后请求结果的id是一种有效的模式。
但我建议重新考虑以下几点:
- 不要使用Integer作为id,因为它允许攻击者猜测id并获得这些id的结果。而是使用随机UUID
- 如果需要重新启动应用程序,所有id及其结果都将丢失。您应该将它们持久化到数据库中
- 您的解决方案将无法在具有许多应用程序实例的集群中工作,因为每个实例都只知道自己的id和结果。这也可以通过将它们持久化到数据库或Reddis存储来解决
- 使用CompletableFuture的方式无法控制用于异步操作的线程数。使用标准Java可以做到这一点,但我建议使用Spring来配置线程池
- 用@Async注释控制器方法不是一个选项,这是行不通的。相反,将所有异步操作放入一个简单的服务中,并用@Async对此进行注释。这有一些优点:
- 您也可以同步使用此服务,这使测试更加容易
- 您可以使用Spring配置线程池
- /nonBlockingEndpoint不应返回id,而是返回到queryOpStatus的完整链接,包括id。客户端可以直接使用此链接,而无需任何其他信息
此外,您可能还想更改一些低级别的实现问题:
- 不要使用Vector,它会在每次操作中同步。请改用列表。在列表上迭代也容易得多,可以用于循环或流
- 如果需要查找值,请不要迭代Vector或List,而是使用Map
- APIOperationsManager是一个单例。这在Spring应用程序中毫无意义。让它成为一个普通的PoJo,并创建一个bean,让它自动连接到控制器中。Spring bean默认情况下是singleton
- 您应该避免在控制器方法中执行复杂的操作。相反,将任何内容移动到服务中(可以用@Async进行注释)。这使得测试更容易,因为您可以在没有web上下文的情况下测试此服务
希望这能有所帮助。
我需要使数据库访问事务化吗
只要只写入/更新一行,就没有必要使其成为事务性的,因为这确实是"原子性的"。
如果一次写入/更新多个行,则应使其成为事务性的,以保证要么更新所有行,要么不更新任何行。
但是,如果两个操作(可能来自两个客户端)更新同一行,则总是最后一个操作获胜。