我设法设置了一个从Undertow HTTP处理程序调用的Hystrix命令:
public void handleRequest(HttpServerExchange exchange) throws Exception {
if (exchange.isInIoThread()) {
exchange.dispatch(this);
return;
}
RpcClient rpcClient = new RpcClient(/* ... */);
try {
byte[] response = new RpcCommand(rpcClient).execute();
// send the response
} catch (Exception e) {
// send an error
}
}
这很好用。但是现在,我想使用 Hystrix 的可观察功能,调用 observe
而不是 execute
,使代码非阻塞。
public void handleRequest(HttpServerExchange exchange) throws Exception {
RpcClient rpcClient = new RpcClient(/* ... */);
new RpcCommand(rpcClient).observe().subscribe(new Observer<byte[]>(){
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable throwable) {
exchange.setStatusCode(StatusCodes.INTERNAL_SERVER_ERROR);
exchange.endExchange();
}
@Override
public void onNext(byte[] body) {
exchange.getResponseHeaders().add(Headers.CONTENT_TYPE, "text/plain");
exchange.getResponseSender().send(ByteBuffer.wrap(body));
}
});
}
正如预期的那样(读取文档(,处理程序立即返回,因此,交换结束;当执行onNext
回调时,它失败并出现异常:
Caused by: java.lang.IllegalStateException: UT000127: Response has already been sent
at io.undertow.io.AsyncSenderImpl.send(AsyncSenderImpl.java:122)
at io.undertow.io.AsyncSenderImpl.send(AsyncSenderImpl.java:272)
at com.xxx.poc.undertow.DiyServerBootstrap$1$1.onNext(DiyServerBootstrap.java:141)
at com.xxx.poc.undertow.DiyServerBootstrap$1$1.onNext(DiyServerBootstrap.java:115)
at rx.internal.util.ObserverSubscriber.onNext(ObserverSubscriber.java:34)
有没有办法告诉 Undertow 处理程序正在异步执行 IO?我希望使用大量非阻塞代码来访问数据库和其他服务。
提前感谢!
您应该dispatch()
一个 Runnable,以便在 handleRequest
方法返回时交换不会结束。由于客户端和订阅的创建是非常简单的任务,因此您可以使用如下SameThreadExecutor.INSTANCE
在同一线程上执行此操作:
public void handleRequest(HttpServerExchange exchange) throws Exception {
exchange.dispatch(SameThreadExecutor.INSTANCE, () -> {
RpcClient rpcClient = new RpcClient(/* ... */);
new RpcCommand(rpcClient).observe().subscribe(new Observer<byte[]>(){
//...
});
});
}
(如果不将执行程序传递给dispatch()
,它会将其调度到 XNIO 工作线程池。如果您希望在自己的执行器上执行客户端创建和订阅,则应改为传递它。