AWS SDK v2 SdkAsyncHttpClient implementation Using Java 11 j



我正在尝试实现一个使用Java 11的java.net.http.HttpClient(特别是sendAsync(的SdkAsyncHttpClient。SdkAsyncHttpClient有一种方法来实现CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest)。该AsyncExecuteRequest提供了一种获取有关HTTP请求的详细信息的方法,并且至关重要的是,SdkHttpContentPublisher。这进入了反应式发布者/订阅模型的范式 -HttpClient.sendAsync似乎内置了支持。我似乎接近实现,但(至少(缺少一个关键步骤,因为我似乎无法完成返回的未来。

我认为我可能缺少一些以直接的方式将两者联系在一起的基本内容,但到目前为止,我还没有找到它。

这是我对幼稚(且非常简单(实现的尝试:

import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.utils.AttributeMap;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow;
import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpClient.Version.HTTP_2;
import static software.amazon.awssdk.http.Protocol.HTTP2;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIMEOUT;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.READ_TIMEOUT;
public class JavaAsyncHttpClient implements SdkAsyncHttpClient {
private final HttpClient httpClient;
public JavaAsyncHttpClient(AttributeMap options) {
this.httpClient = HttpClient.newBuilder()
.connectTimeout(options.get(CONNECTION_TIMEOUT))
.version(options.get(PROTOCOL) == HTTP2 ? HTTP_2 : HTTP_1_1)
.build();
}
@Override
public CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest) {
SdkHttpRequest request = asyncExecuteRequest.request();
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(request.getUri());
for (Map.Entry<String, List<String>> header : request.headers().entrySet()) {
// avoid java.lang.IllegalArgumentException: restricted header name: "Content-Length"
if (!header.getKey().equalsIgnoreCase("Content-Length") && !header.getKey().equalsIgnoreCase("Host")) {
for (String headerVal : header.getValue()) {
requestBuilder = requestBuilder.header(header.getKey(), headerVal);
}
}
}
switch (request.method()) {
case POST:
requestBuilder = requestBuilder.POST(HttpRequest.BodyPublishers.fromPublisher(
toFlowPublisher(asyncExecuteRequest.requestContentPublisher())));
break;
case PUT:
requestBuilder = requestBuilder.PUT(HttpRequest.BodyPublishers.fromPublisher(
toFlowPublisher(asyncExecuteRequest.requestContentPublisher())));
break;
case DELETE:
requestBuilder = requestBuilder.DELETE();
break;
case HEAD:
requestBuilder = requestBuilder.method("HEAD", HttpRequest.BodyPublishers.noBody());
break;
case PATCH:
throw new UnsupportedOperationException("PATCH not supported");
case OPTIONS:
requestBuilder = requestBuilder.method("OPTIONS", HttpRequest.BodyPublishers.noBody());
break;
}
// Need to use BodyHandlers.ofPublisher() or is that a dead end? How can link up the AWS Publisher/Subscribers
Subscriber<ByteBuffer> subscriber = new BaosSubscriber(new CompletableFuture<>());
asyncExecuteRequest.requestContentPublisher().subscribe(subscriber);
HttpRequest httpRequest = requestBuilder.build();
return httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.fromSubscriber(toFlowSubscriber(subscriber)))
.thenApply(voidHttpResponse -> null);
}
private Flow.Subscriber<? super List<ByteBuffer>> toFlowSubscriber(Subscriber<ByteBuffer> subscriber) {
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriber.onSubscribe(toAwsSubscription(subscription));
}
@Override
public void onNext(List<ByteBuffer> item) {
subscriber.onNext(item.get(0));
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
};
}
private Subscription toAwsSubscription(Flow.Subscription subscription) {
return new Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}
@Override
public void cancel() {
subscription.cancel();
}
};
}
private Flow.Publisher<ByteBuffer> toFlowPublisher(SdkHttpContentPublisher requestContentPublisher) {
return subscriber -> requestContentPublisher.subscribe(toAwsSubscriber(subscriber));
}
private Subscriber<? super ByteBuffer> toAwsSubscriber(Flow.Subscriber<? super ByteBuffer> subscriber) {
return new Subscriber<>() {
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(toFlowSubscription(s));
}
@Override
public void onNext(ByteBuffer byteBuffer) {
subscriber.onNext(byteBuffer);
}
@Override
public void onError(Throwable t) {
subscriber.onError(t);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
};
}
private Flow.Subscription toFlowSubscription(Subscription subscription) {
return new Flow.Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}
@Override
public void cancel() {
subscription.cancel();
}
};
}
@Override
public void close() {}

private static class BaosSubscriber implements Subscriber<ByteBuffer> {
private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
private final CompletableFuture<ByteArrayOutputStream> streamFuture;
private Subscription subscription;
private BaosSubscriber(CompletableFuture<ByteArrayOutputStream> streamFuture) {
this.streamFuture = streamFuture;
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(ByteBuffer byteBuffer) {
try {
baos.write(BinaryUtils.copyBytesFrom(byteBuffer));
this.subscription.request(Long.MAX_VALUE);
} catch (IOException e) {
// Should never happen
streamFuture.completeExceptionally(e);
}
}
@Override
public void onError(Throwable t) {
streamFuture.completeExceptionally(t);
}
@Override
public void onComplete() {
streamFuture.complete(baos);
}
}

我在这里错过了什么?返回一个以null完成的未来遵循SdkAsyncHttpClient规范,因此很明显,HTTP 响应需要以某种方式发送给 AWS 方面的订阅者 - 但这就是我迷路的地方。

编辑:刚刚通过谷歌搜索找到这个:https://github.com/rmcsoft/j11_aws_http_client/blob/63f05326990317c59f1863be55942054769b437e/src/main/java/com/rmcsoft/aws/http/proxy/BodyHandlerProxy.java - 看看答案是否在里面。

当我问这个问题时,我不知道 - 这个地面已经被踩到了。Nikita Skornyakov(@rmcsoft Github(实现了这个确切的东西(一个使用Java 11的HTTP客户端(java.net.http(的SdkAsyncHttpClient实现(。可以在这里找到:https://github.com/rmcsoft/j11_aws_http_client(麻省理工学院许可(。

为了完成起见,这里有一个自包含的(你可能永远不应该使用(的 Java 实现:

package com.dow.as2;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.http.AbortableInputStream;
import software.amazon.awssdk.http.Protocol;
import software.amazon.awssdk.http.SdkHttpConfigurationOption;
import software.amazon.awssdk.http.SdkHttpFullResponse;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
import software.amazon.awssdk.http.async.SdkHttpContentPublisher;
import software.amazon.awssdk.utils.AttributeMap;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicReference;
import static java.net.http.HttpClient.Version.HTTP_1_1;
import static java.net.http.HttpClient.Version.HTTP_2;
import static software.amazon.awssdk.http.Protocol.HTTP2;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.CONNECTION_TIMEOUT;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.PROTOCOL;
import static software.amazon.awssdk.http.SdkHttpConfigurationOption.READ_TIMEOUT;
public class JavaAsyncHttpClient implements SdkAsyncHttpClient {
private static final String CLIENT_NAME = "JavaNetAsyncHttpClient";
private final HttpClient httpClient;
private JavaAsyncHttpClient(AttributeMap options) {
this.httpClient = HttpClient.newBuilder()
.connectTimeout(options.get(CONNECTION_TIMEOUT))
.version(options.get(PROTOCOL) == HTTP2 ? HTTP_2 : HTTP_1_1)
.build();
}
public static Builder builder() {
return new DefaultBuilder();
}
/**
* Create a {@link HttpClient} client with the default properties
*
* @return a {@link JavaHttpClient}
*/
public static SdkAsyncHttpClient create() {
return new DefaultBuilder().build();
}
@Override
public CompletableFuture<Void> execute(AsyncExecuteRequest asyncExecuteRequest) {
SdkHttpRequest request = asyncExecuteRequest.request();
HttpRequest.Builder requestBuilder = HttpRequest.newBuilder().uri(request.getUri());
for (Map.Entry<String, List<String>> header : request.headers().entrySet()) {
// avoid java.lang.IllegalArgumentException: restricted header name: "Content-Length"
if (!header.getKey().equalsIgnoreCase("Content-Length") && !header.getKey().equalsIgnoreCase("Host")) {
for (String headerVal : header.getValue()) {
requestBuilder = requestBuilder.header(header.getKey(), headerVal);
}
}
}
switch (request.method()) {
case POST:
requestBuilder = requestBuilder.POST(new BodyPublisherProxy(asyncExecuteRequest.requestContentPublisher()));
break;
case PUT:
requestBuilder = requestBuilder.PUT(new BodyPublisherProxy(asyncExecuteRequest.requestContentPublisher()));
break;
case DELETE:
requestBuilder = requestBuilder.DELETE();
break;
case HEAD:
requestBuilder = requestBuilder.method("HEAD", HttpRequest.BodyPublishers.noBody());
break;
case PATCH:
throw new UnsupportedOperationException("PATCH not supported");
case OPTIONS:
requestBuilder = requestBuilder.method("OPTIONS", HttpRequest.BodyPublishers.noBody());
break;
}
// Need to use BodyHandlers.ofPublisher() or is that a dead end? How can link up the AWS Publisher/Subscribers
// with HttpClient sendAsync Flow.Publishers/Flow.Subscriber?
var responseHandler = asyncExecuteRequest.responseHandler();
var bodyHandler = new BodyHandlerProxy(asyncExecuteRequest.responseHandler());
return httpClient
.sendAsync(requestBuilder.build(), bodyHandler)
.thenApply(HttpResponse::body)
.thenApply(this::toAwsPublisher)
.thenAccept(responseHandler::onStream)
.exceptionally(t -> {
responseHandler.onError(t);
return null;
});
}
private Subscription toAwsSubscription(Flow.Subscription subscription) {
return new Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}
@Override
public void cancel() {
subscription.cancel();
}
};
}
private Flow.Subscriber<? super ByteBuffer> toFlowSubscriber(Subscriber<? super ByteBuffer> subscriber) {
return new Flow.Subscriber<>() {
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriber.onSubscribe(toAwsSubscription(subscription));
}
@Override
public void onNext(ByteBuffer item) {
subscriber.onNext(item);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
};
}
private Publisher<ByteBuffer> toAwsPublisher(Flow.Publisher<ByteBuffer> publisher) {
return new Publisher<>() {
@Override
public void subscribe(Subscriber<? super ByteBuffer> s) {
publisher.subscribe(toFlowSubscriber(s));
}
};
}
@Override
public void close() {
}
@Override
public String clientName() {
return CLIENT_NAME;
}
private static final class DefaultBuilder implements Builder {
private final AttributeMap.Builder standardOptions = AttributeMap.builder();
private DefaultBuilder() {
}
/**
* Sets the read timeout to a specified timeout. A timeout of zero is interpreted as an infinite timeout.
*
* @param socketTimeout the timeout as a {@link Duration}
* @return this object for method chaining
*/
public Builder socketTimeout(Duration socketTimeout) {
standardOptions.put(READ_TIMEOUT, socketTimeout);
return this;
}
public void setSocketTimeout(Duration socketTimeout) {
socketTimeout(socketTimeout);
}
/**
* Sets the connect timeout to a specified timeout. A timeout of zero is interpreted as an infinite timeout.
*
* @param connectionTimeout the timeout as a {@link Duration}
* @return this object for method chaining
*/
public Builder connectionTimeout(Duration connectionTimeout) {
standardOptions.put(CONNECTION_TIMEOUT, connectionTimeout);
return this;
}
public void setConnectionTimeout(Duration connectionTimeout) {
connectionTimeout(connectionTimeout);
}
public Builder protocol(Protocol protocol) {
standardOptions.put(PROTOCOL, protocol);
return this;
}
/**
* Used by the SDK to create a {@link SdkAsyncHttpClient} with service-default values if no other values have been configured
*
* @param serviceDefaults Service specific defaults. Keys will be one of the constants defined in
*                        {@link SdkHttpConfigurationOption}.
* @return an instance of {@link SdkAsyncHttpClient}
*/
@Override
public SdkAsyncHttpClient buildWithDefaults(AttributeMap serviceDefaults) {
return new JavaAsyncHttpClient(standardOptions.build()
.merge(serviceDefaults)
.merge(SdkHttpConfigurationOption.GLOBAL_HTTP_DEFAULTS));
}
}
private static class BodyHandlerProxy implements HttpResponse.BodyHandler<Flow.Publisher<ByteBuffer>> {
private final SdkAsyncHttpResponseHandler handler;
private BodyHandlerProxy(SdkAsyncHttpResponseHandler responseHandler) {
Objects.requireNonNull(responseHandler);
handler = responseHandler;
}
@Override
public HttpResponse.BodySubscriber<Flow.Publisher<ByteBuffer>> apply(HttpResponse.ResponseInfo responseInfo) {
handler.onHeaders(new SdkHttpHeadersProxy(responseInfo));
return new BodySubscriberProxy();
}
}
static final class SubscriberRef {
Flow.Subscriber<? super ByteBuffer> ref;
SubscriberRef(Flow.Subscriber<? super ByteBuffer> subscriber) {
ref = subscriber;
}
Flow.Subscriber<? super ByteBuffer> get() {
return ref;
}
Flow.Subscriber<? super ByteBuffer> clear() {
Flow.Subscriber<? super ByteBuffer> res = ref;
ref = null;
return res;
}
}
static final class SubscriptionRef implements Flow.Subscription {
final Flow.Subscription subscription;
final SubscriberRef subscriberRef;
SubscriptionRef(Flow.Subscription subscription,
SubscriberRef subscriberRef) {
this.subscription = subscription;
this.subscriberRef = subscriberRef;
}
@Override
public void request(long n) {
if (subscriberRef.get() != null) {
subscription.request(n);
}
}
@Override
public void cancel() {
subscription.cancel();
subscriberRef.clear();
}
void subscribe() {
Flow.Subscriber<?> subscriber = subscriberRef.get();
if (subscriber != null) {
subscriber.onSubscribe(this);
}
}
@Override
public String toString() {
return String
.format("SubscriptionRef/%s@%s", subscription.getClass().getName(), System.identityHashCode(subscription));
}
}
// Adapted from jdk.internal.net.http.ResponseSubscribers.PublishingBodySubscriber
private static class BodySubscriberProxy implements HttpResponse.BodySubscriber<Flow.Publisher<ByteBuffer>> {
private final CompletableFuture<Flow.Subscription>
subscriptionCF = new CompletableFuture<>();
private final CompletableFuture<SubscriberRef>
subscribedCF = new CompletableFuture<>();
private AtomicReference<SubscriberRef>
subscriberRef = new AtomicReference<>();
private final CompletableFuture<Flow.Publisher<ByteBuffer>> body =
subscriptionCF.thenCompose(
(s) -> CompletableFuture.completedFuture(this::subscribe));
private final CompletableFuture<Void> completionCF;
BodySubscriberProxy() {
completionCF = new CompletableFuture<>();
completionCF.whenComplete(
(r, t) -> subscribedCF.thenAccept(s -> complete(s, t)));
}
public CompletionStage<Flow.Publisher<ByteBuffer>> getBody() {
return body;
}

// This is a callback for the subscribedCF.
// Do not call directly!
private void complete(SubscriberRef ref, Throwable t) {
Flow.Subscriber<?> s = ref.clear();
// maybe null if subscription was cancelled
if (s == null) {
return;
}
if (t != null) {
s.onError(t);
return;
}
try {
s.onComplete();
} catch (Throwable x) {
s.onError(x);
}
}
private void signalError(Throwable err) {
completionCF.completeExceptionally(err != null ? err : new IllegalArgumentException("null throwable"));
}
private void signalComplete() {
completionCF.complete(null);
}
private void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
if (subscriber == null) {
throw new IllegalArgumentException("subscriber must not be null");
}
SubscriberRef ref = new SubscriberRef(subscriber);
if (subscriberRef.compareAndSet(null, ref)) {
subscriptionCF.thenAccept((s) -> {
SubscriptionRef subscription = new SubscriptionRef(s, ref);
try {
subscription.subscribe();
subscribedCF.complete(ref);
} catch (Throwable t) {
subscription.cancel();
}
});
} else {
subscriber.onSubscribe(new Flow.Subscription() {
@Override
public void request(long n) {
}
@Override
public void cancel() {
}
});
subscriber.onError(new IllegalStateException("This publisher has already one subscriber"));
}
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
subscriptionCF.complete(subscription);
}
@Override
public void onNext(List<ByteBuffer> item) {
try {
SubscriberRef ref = subscriberRef.get();
Flow.Subscriber<? super ByteBuffer> subscriber = ref.get();
if (subscriber != null) { // may be null if subscription was cancelled.
item.forEach(subscriber::onNext);
}
} catch (Throwable err) {
signalError(err);
subscriptionCF.thenAccept(Flow.Subscription::cancel);
}
}
@Override
public void onError(Throwable throwable) {
// onError can be called before request(1), and therefore can
// be called before subscriberRef is set.
signalError(throwable);
}
@Override
public void onComplete() {
// cannot be called before onSubscribe()
if (!subscriptionCF.isDone()) {
signalError(new InternalError("onComplete called before onSubscribed"));
} else {
// onComplete can be called before request(1),
// and therefore can be called before subscriberRef
// is set.
signalComplete();
}
}
}
private static class SdkHttpHeadersProxy implements SdkHttpFullResponse {
private final HttpResponse.ResponseInfo responseInfo;
private SdkHttpHeadersProxy(HttpResponse.ResponseInfo responseInfo) {
Objects.requireNonNull(responseInfo);
this.responseInfo = responseInfo;
}
@Override
public Optional<String> statusText() {
return Optional.empty();
}
@Override
public int statusCode() {
return responseInfo.statusCode();
}
@Override
public Map<String, List<String>> headers() {
return responseInfo.headers().map();
}
@Override
public Builder toBuilder() {
return SdkHttpResponse
.builder()
.headers(headers())
.statusCode(statusCode());
}
@Override
public Optional<AbortableInputStream> content() {
return Optional.empty(); // will be available at later stage
}
}
private class BodyPublisherProxy implements HttpRequest.BodyPublisher {
private final SdkHttpContentPublisher publisher;
private BodyPublisherProxy(SdkHttpContentPublisher publisher) {
Objects.requireNonNull(publisher);
this.publisher = publisher;
}
@Override
public long contentLength() {
return publisher.contentLength().orElse(-1L);
}
@Override
public void subscribe(Flow.Subscriber<? super ByteBuffer> subscriber) {
publisher.subscribe(toAwsSubscriber(subscriber));
}
}
private Flow.Subscription toFlowSubscription(Subscription subscription) {
return new Flow.Subscription() {
@Override
public void request(long n) {
subscription.request(n);
}
@Override
public void cancel() {
subscription.cancel();
}
};
}
private Subscriber<? super ByteBuffer> toAwsSubscriber(Flow.Subscriber<? super ByteBuffer> subscriber) {
return new Subscriber<>() {
@Override
public void onSubscribe(Subscription s) {
subscriber.onSubscribe(toFlowSubscription(s));
}
@Override
public void onNext(ByteBuffer byteBuffer) {
subscriber.onNext(byteBuffer);
}
@Override
public void onError(Throwable t) {
subscriber.onError(t);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
};
}
}

我建议使用之前链接的j11_aws_http_client(例如,它只处理一小部分受限制的标头(。上面的代码几乎完全是从该 Github 项目中复制和粘贴的。

如果有一种方法可以使用java.net.http.BodySubscribers.ofPublisher(这是一个Flow.Publisher<List<ByteBuffer>>>(,则可以大大简化实现。

最新更新