Undertow :在 Http 处理程序中使用 HystrixObservable



我设法设置了一个从Undertow HTTP处理程序调用的Hystrix命令:

public void handleRequest(HttpServerExchange exchange) throws Exception {
    if (exchange.isInIoThread()) {
        exchange.dispatch(this);
        return;
    }
    RpcClient rpcClient = new RpcClient(/* ... */);
    try {
        byte[] response = new RpcCommand(rpcClient).execute();
        // send the response
    } catch (Exception e) {
        // send an error
    }
}

这很好用。但是现在,我想使用 Hystrix 的可观察功能,调用 observe 而不是 execute ,使代码非阻塞。

public void handleRequest(HttpServerExchange exchange) throws Exception {
    RpcClient rpcClient = new RpcClient(/* ... */);
    new RpcCommand(rpcClient).observe().subscribe(new Observer<byte[]>(){
        @Override
        public void onCompleted() {
        }
        @Override
        public void onError(Throwable throwable) {
            exchange.setStatusCode(StatusCodes.INTERNAL_SERVER_ERROR);
            exchange.endExchange();
        }
        @Override
        public void onNext(byte[] body) {
            exchange.getResponseHeaders().add(Headers.CONTENT_TYPE, "text/plain");
            exchange.getResponseSender().send(ByteBuffer.wrap(body));
        }
    });
}

正如预期的那样(读取文档(,处理程序立即返回,因此,交换结束;当执行onNext回调时,它失败并出现异常:

Caused by: java.lang.IllegalStateException: UT000127: Response has already been sent
at io.undertow.io.AsyncSenderImpl.send(AsyncSenderImpl.java:122)
at io.undertow.io.AsyncSenderImpl.send(AsyncSenderImpl.java:272)
at com.xxx.poc.undertow.DiyServerBootstrap$1$1.onNext(DiyServerBootstrap.java:141)
at com.xxx.poc.undertow.DiyServerBootstrap$1$1.onNext(DiyServerBootstrap.java:115)
at rx.internal.util.ObserverSubscriber.onNext(ObserverSubscriber.java:34)

有没有办法告诉 Undertow 处理程序正在异步执行 IO?我希望使用大量非阻塞代码来访问数据库和其他服务。

提前感谢!

您应该dispatch()一个 Runnable,以便在 handleRequest 方法返回时交换不会结束。由于客户端和订阅的创建是非常简单的任务,因此您可以使用如下SameThreadExecutor.INSTANCE在同一线程上执行此操作:

public void handleRequest(HttpServerExchange exchange) throws Exception {
    exchange.dispatch(SameThreadExecutor.INSTANCE, () -> {
        RpcClient rpcClient = new RpcClient(/* ... */);
        new RpcCommand(rpcClient).observe().subscribe(new Observer<byte[]>(){
            //...
        });
    });
}

(如果不将执行程序传递给dispatch(),它会将其调度到 XNIO 工作线程池。如果您希望在自己的执行器上执行客户端创建和订阅,则应改为传递它。

相关内容

  • 没有找到相关文章

最新更新