Java 11 HTTPCLIENT HTTP2流太多错误



我正在使用Java 11的HttpClient将请求发布到HTTP2服务器。如下所示,HTTPCLIENT对象是单胎spring bean创建的。

@Bean
    public HttpClient getClient() {
                return HttpClient.newBuilder().version(Version.HTTP_2).executor(Executors.newFixedThreadPool(20)).followRedirects(Redirect.NORMAL)
                .connectTimeout(Duration.ofSeconds(20)).build();
    }

我正在使用sendasync方法异步发送请求。

当我尝试连续击中服务器时,我将在特定时间" java.io.io.io exception"后收到错误:并发流太多。我在客户端构建中使用了固定的ThreadPool来尝试克服此错误,但是它仍然给出相同的错误。

例外堆栈是..

java.util.concurrent.CompletionException: java.io.IOException: too many concurrent streams
    at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1108) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2235) ~[?:?]
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:345) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:250) ~[java.net.http:?]
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) ~[?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
    at java.base/java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.io.IOException: too many concurrent streams
    at java.net.http/jdk.internal.net.http.Http2Connection.reserveStream(Http2Connection.java:440) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Http2ClientImpl.getConnectionFor(Http2ClientImpl.java:103) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.ExchangeImpl.get(ExchangeImpl.java:88) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.establishExchange(Exchange.java:293) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl0(Exchange.java:425) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl(Exchange.java:330) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.Exchange.responseAsync(Exchange.java:322) ~[java.net.http:?]
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:304) ~[java.net.http:?]

有人可以帮助我解决这个问题吗?

服务器是tomcat9,其最大并发流是默认的。

当我尝试连续击中服务器

服务器具有max_concurrent_streams的设置,该设置在最初建立HTTP/2连接期间与客户端通信。

如果您使用sendAsync盲目地"连续击中服务器",则您不在等待以前的请求来完成,最终您超过了max_concurrent_streams值并接收上述错误。

解决方案是同时发送小于max_concurrent_streams的许多请求;之后,您只有在以前的要求完成后才发送新请求。这可以使用Semaphore或类似的东西在客户端上轻松实现。

不幸的是,@sbordet建议的Semaphore方法对我无效。我尝试了:

var semaphore = semaphores.computeIfAbsent(getRequestKey(request), k -> new Semaphore(MAX_CONCURRENT_REQUESTS_NUMBER));
CompletableFuture.runAsync(semaphore::acquireUninterruptibly, WAITING_POOL)
                .thenComposeAsync(ignored -> httpClient.sendAsync(request, responseBodyHandler), ASYNC_POOL)
                .whenComplete((response, e) -> semaphore.release());

不能保证连接流将在执行传递到释放信号量的下一个CompletableFuture时发布。对我来说,这种方法在正常执行时使用,但是,如果有任何例外,似乎在调用semaphore.release()后可以关闭连接流。

最后,我最终使用OKHTTP。它处理问题(如果同时流到达max_concurrent_streams的数量,它只会等到一些流释放到一些流。它还处理GOAWAY帧。对于Java HttpClient,我必须实现重试逻辑来处理此操作,因为如果服务器发送GOAWAY帧,则仅抛出IOException

我认为 @sbordet的答案是不正确的,并且由于您每秒的请求超过MAX_CONCURRENT_STREAMS而不会发生此错误,但是因为开放的HTTP流数量(每个HTTP 2连接?)超过编号。

例如,我有一台在工作中的服务器,其MAX_CONCURRENT_STREAMS设置为128:

$ curl -iv -H "Content-Type: application/json" https://example.local
...
* Connection state changed (MAX_CONCURRENT_STREAMS == 128)!

但是,我似乎可以每秒大约1000个请求来击中它,而不会收回任何错误:

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
​
public class TooManyConcurrentStreams1 {
​
    private static final int CONCURRENCY = 1000;
​
    public static void main(String[] args) {
        final var counter = new AtomicInteger();
        final var singletonHttpClient = newHttpClient();
        final var singletonRequest = newRequest();
        final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(CONCURRENCY);
​
        for (int i = 0; i < CONCURRENCY; i++) {
            responses.add(singletonHttpClient.sendAsync(singletonRequest, BodyHandlers.discarding()));
        }
​
        for (CompletableFuture<HttpResponse<Void>> response : responses) {
            response.thenAccept(x -> {});
            response.join();
            System.out.println(counter.incrementAndGet());
        }
​
        singletonHttpClient.executor().ifPresent(executor -> {
            if (executor instanceof ExecutorService executorService) {
                executorService.shutdown();
            }
        });
    }
​
    public static HttpRequest newRequest() {
        return HttpRequest.newBuilder()
                .uri(Constants.TEST_URI)
                .header("Content-Type", Constants.CONTENT_TYPE)
                .header("Accept", Constants.CONTENT_TYPE)
                .POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
                .build();
    }
​
    public static HttpClient newHttpClient() {
        return HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_2)
                .executor(Executors.newFixedThreadPool(CONCURRENCY))
                .build();
    }
}

当我将CONCURRENCY增加到2000年的荒谬数字时我遇到了这个错误,而不是java.io.IOException: too many concurrent streams

Exception in thread "main" java.util.concurrent.CompletionException: java.net.SocketException: Connection reset
    at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368)
    at java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
    at java.net.http/jdk.internal.net.http.Stream.completeResponseExceptionally(Stream.java:1153)
    at java.net.http/jdk.internal.net.http.Stream.cancelImpl(Stream.java:1238)
    at java.net.http/jdk.internal.net.http.Stream.connectionClosing(Stream.java:1212)
    at java.net.http/jdk.internal.net.http.Http2Connection.shutdown(Http2Connection.java:710)
    at java.net.http/jdk.internal.net.http.Http2Connection$Http2TubeSubscriber.processQueue(Http2Connection.java:1323)
    at java.net.http/jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.run(SequentialScheduler.java:205)
    at java.net.http/jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(SequentialScheduler.java:149)
    at java.net.http/jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(SequentialScheduler.java:230)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

但是,我可以使用此代码重现您的错误(我首先点击此错误,然后在此处找到您的问题!)

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
​
public class TooManyConcurrentStreams2 {
​
    public static void main(String[] args) {
        final var singletonHttpClient = newHttpClient();
        final var singletonRequest = newRequest();
        final var counter = new AtomicInteger();
​
        final var scheduler = Executors.newScheduledThreadPool(2);
​
        scheduler.schedule(scheduler::shutdown, 1, TimeUnit.HOURS);
​
        scheduler.scheduleAtFixedRate(() -> {
            final var batchSize = counter.incrementAndGet();
            final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(batchSize);
​
            try {
                for (int i = 0; i < batchSize; i++) {
                    responses.add(
                            singletonHttpClient.sendAsync(
                                    singletonRequest,
                                    BodyHandlers.discarding()
                            )
                    );
                }
​
                for (CompletableFuture<HttpResponse<Void>> response : responses) {
                    response.thenAccept(x -> {
                    });
                    response.join();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
​
            System.out.println("batchSize = " + batchSize);
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
​
​
    public static HttpRequest newRequest() {
        return HttpRequest.newBuilder()
                .uri(Constants.TEST_URI)
                .header("Content-Type", Constants.CONTENT_TYPE)
                .header("Accept", Constants.CONTENT_TYPE)
                .POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
                .build();
    }
​
    public static HttpClient newHttpClient() {
        return HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_2)
                .build();
    }
}

这个人在每500ms可运行的一次我一次执行我的一次:

java.util.concurrent.CompletionException: java.io.IOException: too many concurrent streams
    at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368)
    at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1189)
    at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:453)
    at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:341)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: too many concurrent streams

因此,问题不是每秒请求数量,而是其他内容,这似乎是每HTTP连接/客户端的并发开放流的数量。

我们可以通过不是共享所有批次请求的同一http客户端(和请求):

import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
​
public class TooManyConcurrentStreams2 {
​
    public static void main(String[] args) {
        final var counter = new AtomicInteger();
​
        final var scheduler = Executors.newScheduledThreadPool(2);
​
        scheduler.schedule(scheduler::shutdown, 1, TimeUnit.HOURS);
​
        scheduler.scheduleAtFixedRate(() -> {
            final var httpClient = newHttpClient();
            final var request = newRequest();
            final var batchSize = counter.incrementAndGet();
            
            final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(batchSize);
​
            try {
                for (int i = 0; i < batchSize; i++) {
                    responses.add(
                            httpClient.sendAsync(
                                    request,
                                    BodyHandlers.discarding()
                            )
                    );
                }
​
                for (CompletableFuture<HttpResponse<Void>> response : responses) {
                    response.thenAccept(x -> {
                    });
                    response.join();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
​
            System.out.println("batchSize = " + batchSize);
        }, 0, 500, TimeUnit.MILLISECONDS);
    }
​
​
    public static HttpRequest newRequest() {
        return HttpRequest.newBuilder()
                .uri(Constants.TEST_URI)
                .header("Content-Type", Constants.CONTENT_TYPE)
                .header("Accept", Constants.CONTENT_TYPE)
                .POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
                .build();
    }
​
    public static HttpClient newHttpClient() {
        return HttpClient.newBuilder()
                .version(HttpClient.Version.HTTP_2)
                .build();
    }
}

对我来说,这是在第143次尝试中失败的,并使用此错误消息:

java.util.concurrent.CompletionException: java.lang.InternalError: java.net.SocketException: Too many open files
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1159)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.InternalError: java.net.SocketException: Too many open files
    at java.net.http/jdk.internal.net.http.PlainHttpConnection.<init>(PlainHttpConnection.java:293)
    at java.net.http/jdk.internal.net.http.AsyncSSLConnection.<init>(AsyncSSLConnection.java:49)
    at java.net.http/jdk.internal.net.http.HttpConnection.getSSLConnection(HttpConnection.java:293)
    at java.net.http/jdk.internal.net.http.HttpConnection.getConnection(HttpConnection.java:279)
    at java.net.http/jdk.internal.net.http.Http2Connection.createAsync(Http2Connection.java:369)
    at java.net.http/jdk.internal.net.http.Http2ClientImpl.getConnectionFor(Http2ClientImpl.java:128)
    at java.net.http/jdk.internal.net.http.ExchangeImpl.get(ExchangeImpl.java:93)
    at java.net.http/jdk.internal.net.http.Exchange.establishExchange(Exchange.java:343)
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl0(Exchange.java:475)
    at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl(Exchange.java:380)
    at java.net.http/jdk.internal.net.http.Exchange.responseAsync(Exchange.java:372)
    at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:408)
    at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:341)
    at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
    ... 5 more

这很可能是由于我笔记本电脑相对较低的12544 ulimit

相关内容

  • 没有找到相关文章

最新更新