可观察的 ZIP 运算符在使用 vertx http 客户端时挂起



我做什么:

我正在使用 vertx rx http 客户端来执行大量 HTTP 请求。 在这种特定情况下,我调用"方法 A",它返回一个 ID 列表。 要接收所有 ID,我需要多次调用方法 A 以获得下一批结果。(每次我指定要接收的不同页码时)

为了提高性能并尽可能并行地进行调用,我创建了一个 (RxJava) 可观察项的列表,每个项都表示单个页面请求的结果。 当我完成创建此列表时,我调用 Obserable.zip 运算符并传递可观察列表。

问题:

在没有特殊设置的情况下使用 vertx http 客户端,一切正常,但速度非常慢。 例如,3000 个 http 请求在 5 分钟内处理完毕。

我试图通过设置 vertx http 客户端选项来提高性能,如下所示:

HttpClientOptions options = new HttpClientOptions();
options.setMaxPoolSize(50)
.setKeepAlive(true)
.setPipelining(true)
.setTcpKeepAlive(true)
.setPipeliningLimit(25)
.setMaxWaitQueueSize(10000);

但是当我这样做时,我得到的结果不稳定:有时一切正常,我能够在不到 20 秒的时间内收到所有响应。 但是,有时我调用的外部服务器会关闭连接,日志显示以下错误:

io.vertx.core.http.impl.HttpClientRequestImpl
SEVERE: io.vertx.core.VertxException: Connection was closed
  • 我的代码中没有调用错误处理程序
  • 出现此错误时,zip 运算符挂起

这是创建 HttpClientRequest 的代码。

public Observable<HttpRestResponse> postWithResponse(String url, Map<String, String> headers, String body) {
Observable<HttpRestResponse> bufferObservable = Observable.create(subscriber -> {
try {
HttpClientRequest request = httpClient.postAbs(url);
addHeadersToRequest(headers, request);
sendRequest(url, subscriber, request, body);
}catch (Exception e) {
try {
subscriber.onError(e);
}catch (Exception ex) {
logger.error("error calling onError for subscriber",ex);
}finally {
subscriber.onCompleted();
}
}
});
return bufferObservable;
}
private void sendRequest(String requestUrl, Subscriber<? super HttpRestResponse> subscriber, HttpClientRequest request, String bodyData) {
final long requestId = reqNumber.getAndIncrement();
if (bodyData != null) {
request.putHeader("Content-Length", String.valueOf(bodyData.getBytes().length);
}
request.putHeader("Accept-Encoding", "gzip,deflate");
Observable<HttpRestResponse> retVal = request.toObservable()
.doOnError(throwable -> {
logger.error("<<< #: " + requestId + " HTTP call failed. requestUrl [" + requestUrl + "] reason:" + throwable.getMessage());
}).doOnNext(response -> {
if (response != null) {
logger.debug(" <<< #: " + requestId + " " + response.statusCode() + " " + response.statusMessage() + " " + requestUrl);
}
}).flatMap(httpClientResponse -> {
try {
if (httpClientResponse != null && doCheckResponse(httpClientResponse, requestUrl, requestId, bodyData)) {
Observable<Buffer> bufferObservable = httpClientResponse.toObservable()
.reduce(Buffer.buffer(1000), (result, buffer) -> result.appendBuffer(buffer));
return bufferObservable.flatMap(buffer -> Observable.just(new HttpRestResponse(buffer, httpClientResponse)));
}
} catch (Exception e) {
logger.error("error in RestHttpClient", e);
}
return Observable.just(new HttpRestResponse(null, httpClientResponse));
});
retVal.subscribe(subscriber);
if (bodyData != null) {
request.end(bodyData); // write post data
} else {
request.end();
}
}

阿斯达斯达斯德

如果您认为可以像这样挂接异常逻辑

try {
HttpClientRequest request = httpClient.postAbs(url);
addHeadersToRequest(headers, request);
sendRequest(url, subscriber, request, body);
}catch (Exception e) {
try {
subscriber.onError(e);
}catch (Exception ex) {
logger.error("error calling onError for subscriber",ex);
}finally {
subscriber.onCompleted();
}
}

您不会收到任何错误,因为基本上您的所有处理现在都驻留在 Rx 生态系统中,因此在您的 try catch 块中不会向您报告任何内容。

从这个时间点开始的错误会来自您的

bufferObservable.onErrorReturn()

bufferObservable.subscribe(success, error)

所以最后我想通了。 似乎我传递了可观察.zip方法一个空列表......

这里的问题是 onNext 或 onError 没有在"zip"方法的返回的可观察对象上被调用。 在这种情况下,只调用 onComplete,我没有费心为它创建处理程序......

非常感谢所有有兴趣并试图提供帮助的人。

亚尼夫

最新更新