如何完成不是字节数组输入流的异步 HTTP 客户端输入流?



我正在使用Async Http客户端从互联网上下载大量(可能很大(文件。

在我的特殊情况下,我需要将字节流从这些下载 URL 发送到另一个服务进行分析。

一个幼稚的方法是这样做:

AsyncHttpClient asyncHttpClient = Dsl.asyncHttpClient(Dsl.config()
.setMaxConnectionsPerHost(-1)
.setMaxConnections(-1)
.setPooledConnectionIdleTimeout(60 * 10 * 1000)
.setConnectionTtl(6 * 60 * 1000)
.setConnectTimeout(5 * 1000)
.setRequestTimeout(5 * 60 * 1000)
.setFollowRedirect(true)
.setRealm(new Realm.Builder(username, password)
.setNtlmDomain(domain)
.setScheme(Realm.AuthScheme.NTLM)
.build())
Response httpGetResponse = asyncHttpClient.prepareGet(url).execute().get();
return httpGetResponse.getResponseBodyAsStream();

但是在异步 http 请求的本教程中,我们了解到与 HTTP 组件 http 客户端不同,异步 http 客户端会将整个文件下载到内存中。

就我而言,这将很快导致 OOM。

所以另一种选择是这样的:

Response httpGetResponse = asyncHttpClient.prepareGet(url).execute(new AsyncHandler<Response>() {
private final Response.ResponseBuilder builder = new Response.ResponseBuilder();
@Override
public State onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
bodyPart.getBodyByteBuffer(); // Each chunk of bytes will be fed into this method.
// I need to write these bytes to the resuting input stream
// without streaming them all into memory.
return State.CONTINUE;
}
@Override
public State onHeadersReceived(HttpHeaders headers) throws Exception {
builder.accumulate(headers);
return State.CONTINUE;
}
@Override
public State onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
builder.accumulate(responseStatus);
return State.CONTINUE;
}
@Override
public Response onCompleted() throws Exception {
return builder.build();
}
@Override
public void onThrowable(Throwable t) {
}
}).get();

当这些字节进入输入流时,获取这些字节的最简单、最干净的方法是什么?

我有两个想法:

1( 将输入写入文件,然后流式传输文件 或 2( 立即返回管道输入流,字节将在接收时写入管道输入流。

有没有人可以分享一个工作示例?

我正确地假设有人已经这样做了。事实上,在我对"异步http客户端"和"管道输入流"进行搜索后,我在项目本身中找到了这个:

https://github.com/AsyncHttpClient/async-http-client/blob/master/client/src/main/java/org/asynchttpclient/handler/BodyDeferringAsyncHandler.java

用法:

PipedInputStream pipedInputStream = new PipedInputStream();
PipedOutputStream pipedOutputStream = new PipedOutputStream(pipedInputStream);
BodyDeferringAsyncHandler bodyDeferringAsyncHandler = new BodyDeferringAsyncHandler(pipedOutputStream);
Future<Response> futureResponse = asyncHttpClient.prepareGet(url).execute(bodyDeferringAsyncHandler);
Response response = bodyDeferringAsyncHandler.getResponse();
if (response.getStatusCode() == 200) {
return new BodyDeferringAsyncHandler.BodyDeferringInputStream(futureResponse,
bodyDeferringAsyncHandler,
pipedInputStream);
} else {
return null;
}

最新更新