我正在使用Spring@RequestMapping来处理消费和生成JSON的REST同步服务。我现在想添加异步响应,即客户端发送一个id列表,服务器只将详细信息发送回一个客户端。
我找了一段时间,没有找到我要找的东西。对于Spring,我看到了两种不同的方法。最常见的是消息代理方法,在这种方法中,似乎每个人都通过订阅队列或主题来获得每条消息。这是非常不可接受的,因为这是私人数据。我还有有限数量的数据点要返回。另一种方法是Callable、AsyncResult或DeferredResult。这似乎是为了将数据保密,但我想发送多个响应。
我看到了一些类似于我想要的东西,但在服务器上使用了Jersey SSE。我想坚持春天。
这就是我目前使用的伪代码。
@RequestMapping(value = BASE_PATH + "/balances", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public GetAccountBalancesResponse getAccountBalances(@RequestBody GetAccountBalancesRequest request) {
GetAccountBalancesResponse ret = new GetAccountBalancesResponse();
ret.setBalances(synchronousService.getBalances(request.getIds());
return ret;
}
这正是我想要做的。由于我对细节一无所知,所以相当粗糙。一旦我明白了发送,我将处理异步部分,但会接受任何建议。
@RequestMapping(value = BASE_PATH + "/balances", method = RequestMethod.POST, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE)
public ???<BalanceDetails> getAccountBalances(@RequestBody GetAccountBalancesRequest request) {
final ???<BalanceDetails> ret = new ???<>();
new Thread(new Runnable() {
public void run() {
List<Future<BalanceDetails>> futures = asynchronousService.getBalances(request.getIds());
while(!stillWaiting(futures)) {
// Probably use something like a Condition to block until there is some details.
ret.send(getFinishedDetails(futures));
}
ret.close();
}
}).start();
return ret;
}
谢谢,韦斯。
它不是这样工作的:您使用的是纯Spring操作,这些操作旨在在单个线程中处理,可能会阻塞,直到计算出完整的请求。您不会在控制器中创建线程,或者至少不会以这种方式创建线程。
如果计算持续很长时间,并且你想给用户一个视觉反馈,那么以下是步骤:
- 优化过程:)使用索引、缓存等
-
如果仍然不够,计算仍然会永远持续,并且用户需要反馈,那么有两个选项
- 使用javascript进行民意调查并显示视觉反馈(更容易)。基本上,您将任务提交到线程池并立即返回,还有另一个控制器方法读取计算的当前状态并将其返回给用户。javascript每隔10秒左右调用一次此方法
- 使用反向通道(服务器推送、websocket)并不容易,因为您必须同时实现客户端和服务器部分。显然,有些库和协议只会使这段代码变长几行,但如果你以前从未尝试过,你会花一些时间来理解设置和调试websocket并不像常规HTTP那样容易,因为有调试工具
花了很多功夫,但看起来SpringWeb4.2确实支持服务器端事件。我使用的是Spring Boot 1.2.7,它使用Spring Web 4.1.7。切换到Spring Boot 1.3.0.RC1会添加SseEmitter。
这是我的伪代码。
@RequestMapping(value = BASE_PATH + "/getAccountBalances", method = RequestMethod.GET)
public SseEmitter getAccountBalances(@QueryParam("accountId") Integer[] accountIds) {
final SseEmitter emitter = new SseEmitter();
new Thread(new Runnable() {
@Override
public void run() {
try {
for (int xx = 0; xx < ids.length; xx++) {
Thread.sleep(2000L + rand.nextInt(2000));
BalanceDetails balance = new BalanceDetails();
...
emitter.send(emitter.event().name("accountBalance").id(String.valueOf(accountIds[xx]))
.data(balance, MediaType.APPLICATION_JSON));
}
emitter.send(SseEmitter.event().name("complete").data("complete"));
emitter.complete();
} catch (Exception ee) {
ee.printStackTrace();
emitter.completeWithError(ee);
}
}
}).start();
return emitter;
}
仍然可以优雅地关闭通道,并使用Jersey EventSource解析JSON对象,但它比消息总线要好得多。
生成一个新线程和使用睡眠也只是为了POC。我不需要任何一个,因为我们已经有了一个异步进程来访问一个缓慢的后端系统。
韦斯。