在 Java 11 HttpClient 中取消 http 请求



我正在尝试通过新的Java 11 HttpClient取消http请求。

这是我的测试代码:

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
public class App {
public static void main(String... args) throws InterruptedException {
HttpClient client = HttpClient.newBuilder().build();
URI uri = URI.create("http://releases.ubuntu.com/18.04.2/ubuntu-18.04.2-desktop-amd64.iso");
HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();
var bodyHandler = HttpResponse.BodyHandlers.ofByteArrayConsumer(b -> System.out.println("#"));
var future = client.sendAsync(request, bodyHandler);
Thread.sleep(1000);
future.cancel(true);
System.out.println("rn----------CANCEL!!!------------");
System.out.println("rnisCancelled: " + future.isCancelled());
Thread.sleep(250);
}
}

我希望,该请求任务将在调用future.cancel(true);行后立即取消。因此,控制台中最后打印的行应isCancelled: true

但是,当我运行此代码时,我看到如下所示的内容:

##
## ----------取消!!!------------ #### 已取消:真 ##

那么,这是取消请求的正确方法吗?

UPD

取消请求的正确方法是(正如 Daniel 建议的那样,+ UPD2:避免在方法调用cancel()NPE):

import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandler;
import java.net.http.HttpResponse.BodySubscriber;
import java.net.http.HttpResponse.ResponseInfo;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Subscription;
public class App {
private static class SubscriberWrapper implements BodySubscriber<Void> {
private final CountDownLatch latch;
private final BodySubscriber<Void> subscriber;
private Subscription subscription;
private SubscriberWrapper(BodySubscriber<Void> subscriber, CountDownLatch latch) {
this.subscriber = subscriber;
this.latch = latch;
}
@Override
public CompletionStage<Void> getBody() {
return subscriber.getBody();
}
@Override
public void onSubscribe(Subscription subscription) {
subscriber.onSubscribe(subscription);
this.subscription = subscription;
latch.countDown();
}
@Override
public void onNext(List<ByteBuffer> item) {
subscriber.onNext(item);
}
@Override
public void onError(Throwable throwable) {
subscriber.onError(throwable);
}
@Override
public void onComplete() {
subscriber.onComplete();
}
public void cancel() {
subscription.cancel();
System.out.println("rn----------CANCEL!!!------------");
}
}
private static class BodyHandlerWrapper implements BodyHandler<Void> {
private final CountDownLatch latch = new CountDownLatch(1);
private final BodyHandler<Void> handler;
private SubscriberWrapper subscriberWrapper;
private BodyHandlerWrapper(BodyHandler<Void> handler) {
this.handler = handler;
}
@Override
public BodySubscriber<Void> apply(ResponseInfo responseInfo) {
subscriberWrapper = new SubscriberWrapper(handler.apply(responseInfo), latch);
return subscriberWrapper;
}
public void cancel() {
CompletableFuture.runAsync(() -> {
try {
latch.await();
subscriberWrapper.cancel();
} catch (InterruptedException e) {}
});
}
}
public static void main(String... args) throws InterruptedException, ExecutionException {
HttpClient client = HttpClient.newBuilder().build();
URI uri = URI.create("http://releases.ubuntu.com/18.04.2/ubuntu-18.04.2-desktop-amd64.iso");
HttpRequest request = HttpRequest.newBuilder().uri(uri).GET().build();
var handler = HttpResponse.BodyHandlers.ofByteArrayConsumer(b -> System.out.print("#"));
BodyHandlerWrapper handlerWrapper = new BodyHandlerWrapper(handler);
client.sendAsync(request, handlerWrapper).thenAccept(b -> System.out.println(b.statusCode()));
Thread.sleep(1000);
handlerWrapper.cancel();
System.out.println("rn------Invoke cancel...---------");
Thread.sleep(2500);
}
}

您可以使用java.net.http.HttpClientAPI 取消 HTTP 请求,方法是取消传递给响应BodySubscriberFlow.Subscription对象。 为了保留订阅对象,简单地包装其中一个提供的BodyHandler/BodySubscriber实现应该相对容易。不幸的是,客户端返回的CompletableFuturecancel方法与传递给BodySubscriberFlow.Subscriptioncancel方法之间没有关系。取消请求的正确方法是通过订阅的cancel方法。

取消订阅将同时使用同步(HttpClient::send)和异步(HttpClient::sendAsync)方法。 但是,它将产生不同的效果,具体取决于请求是通过HTTP/1.1还是HTTP/2.0发送的(使用HTTP/1.1将导致连接关闭,使用HTTP/2.0将导致流重置)。当然,如果响应的最后一个字节已经传递给BodySubscriber,它可能根本没有影响。

更新:从Java 16开始,可以通过中断调用HttpClient::send的线程或在HttpClient::sendAsync返回的CompletableFuture上调用cancel(true)来取消请求。这已由 JDK-8245462 实现

同步 VS 异步

请求可以同步或异步发送。同步 API 会阻止,直到 Http 响应可用

HttpResponse<String> response =
client.send(request, BodyHandlers.ofString());
System.out.println(response.statusCode());
System.out.println(response.body());

异步 API 会立即返回一个 CompletableFuture,当它可用时,它会以 HttpResponse 完成。CompletableFuture是在Java 8中添加的,支持可组合的异步编程。

client.sendAsync(request, BodyHandlers.ofString())
.thenApply(response -> { System.out.println(response.statusCode());
return response; } )
.thenApply(HttpResponse::body)
.thenAccept(System.out::println);

未来对象

未来表示异步计算的结果。爪哇文档

这意味着它不是一个同步函数,并且您的假设"我希望,该请求任务将在之后立即取消"仅适用于同步方法。

检查未来对象的取消

如果您想检查任务是否已取消,则有一个有用的isCancelled()方法。

if(future.isCancelled()) {
// Future object is cancelled, do smth
} else {
// Future object is still running, do smth
}

sendAsync() 返回一个 CompletableFuture 对象

该方法sendAsync()返回一个 CompletableFuture。请注意,CompletableFuture实现了Future的接口。

您可以执行以下操作:

client.sendAsync(request, BodyHandlers.ofString())
.thenAccept(response -> {
// do action when completed;
});

在技术术语中,thenAccept方法添加一个在响应可用时要调用的Consumer

为什么取消方法而不是可组合的未来不起作用

由于(与FutureTask不同)此类无法直接控制导致其完成的计算,因此取消被视为异常完成的另一种形式。方法取消与completeExceptionally(new CancellationException())具有相同的效果。方法isCompletedExceptionally()可用于确定CompletableFuture是否以任何特殊方式完成。

如果以CompletionException完成异常,则get()方法和get(long, TimeUnit)抛出与相应CompletionException中持有的原因相同的ExecutionException。为了简化大多数上下文中的用法,此类还定义了在这些情况下直接抛出CompletionException的方法join()和 getNow(T)。

换句话说,

cancel()方法不使用中断来取消,这就是它不起作用的原因。你应该使用completeExceptionally(new CancellationException())

参考

  • https://openjdk.java.net/groups/net/httpclient/intro.html
  • https://docs.oracle.com/javase/8/docs/api/index.html?java/util/concurrent/Future.html

至少对于同步请求,您可以中断调用httpClient.send(..)的线程

然后,http 客户端尽可能快地中止请求并抛出InterruptedException本身。

最新更新