我正在使用Java 11的HttpClient
将请求发布到HTTP2服务器。如下所示,HTTPCLIENT对象是单胎spring bean创建的。
@Bean
public HttpClient getClient() {
return HttpClient.newBuilder().version(Version.HTTP_2).executor(Executors.newFixedThreadPool(20)).followRedirects(Redirect.NORMAL)
.connectTimeout(Duration.ofSeconds(20)).build();
}
我正在使用sendasync方法异步发送请求。
当我尝试连续击中服务器时,我将在特定时间" java.io.io.io exception"后收到错误:并发流太多。我在客户端构建中使用了固定的ThreadPool来尝试克服此错误,但是它仍然给出相同的错误。
例外堆栈是..
java.util.concurrent.CompletionException: java.io.IOException: too many concurrent streams
at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:367) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1108) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2235) ~[?:?]
at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:345) ~[java.net.http:?]
at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:250) ~[java.net.http:?]
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1072) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) ~[?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
at java.base/java.lang.Thread.run(Thread.java:834) [?:?]
Caused by: java.io.IOException: too many concurrent streams
at java.net.http/jdk.internal.net.http.Http2Connection.reserveStream(Http2Connection.java:440) ~[java.net.http:?]
at java.net.http/jdk.internal.net.http.Http2ClientImpl.getConnectionFor(Http2ClientImpl.java:103) ~[java.net.http:?]
at java.net.http/jdk.internal.net.http.ExchangeImpl.get(ExchangeImpl.java:88) ~[java.net.http:?]
at java.net.http/jdk.internal.net.http.Exchange.establishExchange(Exchange.java:293) ~[java.net.http:?]
at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl0(Exchange.java:425) ~[java.net.http:?]
at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl(Exchange.java:330) ~[java.net.http:?]
at java.net.http/jdk.internal.net.http.Exchange.responseAsync(Exchange.java:322) ~[java.net.http:?]
at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:304) ~[java.net.http:?]
有人可以帮助我解决这个问题吗?
服务器是tomcat9,其最大并发流是默认的。
当我尝试连续击中服务器
时
服务器具有max_concurrent_streams
的设置,该设置在最初建立HTTP/2连接期间与客户端通信。
如果您使用sendAsync
盲目地"连续击中服务器",则您不在等待以前的请求来完成,最终您超过了max_concurrent_streams
值并接收上述错误。
解决方案是同时发送小于max_concurrent_streams
的许多请求;之后,您只有在以前的要求完成后才发送新请求。这可以使用Semaphore
或类似的东西在客户端上轻松实现。
不幸的是,@sbordet建议的Semaphore
方法对我无效。我尝试了:
var semaphore = semaphores.computeIfAbsent(getRequestKey(request), k -> new Semaphore(MAX_CONCURRENT_REQUESTS_NUMBER));
CompletableFuture.runAsync(semaphore::acquireUninterruptibly, WAITING_POOL)
.thenComposeAsync(ignored -> httpClient.sendAsync(request, responseBodyHandler), ASYNC_POOL)
.whenComplete((response, e) -> semaphore.release());
不能保证连接流将在执行传递到释放信号量的下一个CompletableFuture
时发布。对我来说,这种方法在正常执行时使用,但是,如果有任何例外,似乎在调用semaphore.release()
后可以关闭连接流。
最后,我最终使用OKHTTP。它处理问题(如果同时流到达max_concurrent_streams
的数量,它只会等到一些流释放到一些流。它还处理GOAWAY
帧。对于Java HttpClient
,我必须实现重试逻辑来处理此操作,因为如果服务器发送GOAWAY
帧,则仅抛出IOException
。
我认为 @sbordet的答案是不正确的,并且由于您每秒的请求超过MAX_CONCURRENT_STREAMS
而不会发生此错误,但是因为开放的HTTP流数量(每个HTTP 2连接?)超过编号。
例如,我有一台在工作中的服务器,其MAX_CONCURRENT_STREAMS
设置为128:
$ curl -iv -H "Content-Type: application/json" https://example.local
...
* Connection state changed (MAX_CONCURRENT_STREAMS == 128)!
但是,我似乎可以每秒大约1000个请求来击中它,而不会收回任何错误:
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
public class TooManyConcurrentStreams1 {
private static final int CONCURRENCY = 1000;
public static void main(String[] args) {
final var counter = new AtomicInteger();
final var singletonHttpClient = newHttpClient();
final var singletonRequest = newRequest();
final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(CONCURRENCY);
for (int i = 0; i < CONCURRENCY; i++) {
responses.add(singletonHttpClient.sendAsync(singletonRequest, BodyHandlers.discarding()));
}
for (CompletableFuture<HttpResponse<Void>> response : responses) {
response.thenAccept(x -> {});
response.join();
System.out.println(counter.incrementAndGet());
}
singletonHttpClient.executor().ifPresent(executor -> {
if (executor instanceof ExecutorService executorService) {
executorService.shutdown();
}
});
}
public static HttpRequest newRequest() {
return HttpRequest.newBuilder()
.uri(Constants.TEST_URI)
.header("Content-Type", Constants.CONTENT_TYPE)
.header("Accept", Constants.CONTENT_TYPE)
.POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
.build();
}
public static HttpClient newHttpClient() {
return HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.executor(Executors.newFixedThreadPool(CONCURRENCY))
.build();
}
}
当我将CONCURRENCY
增加到2000年的荒谬数字时我遇到了这个错误,而不是java.io.IOException: too many concurrent streams
:
Exception in thread "main" java.util.concurrent.CompletionException: java.net.SocketException: Connection reset
at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368)
at java.base/java.util.concurrent.CompletableFuture.completeRelay(CompletableFuture.java:377)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1152)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2162)
at java.net.http/jdk.internal.net.http.Stream.completeResponseExceptionally(Stream.java:1153)
at java.net.http/jdk.internal.net.http.Stream.cancelImpl(Stream.java:1238)
at java.net.http/jdk.internal.net.http.Stream.connectionClosing(Stream.java:1212)
at java.net.http/jdk.internal.net.http.Http2Connection.shutdown(Http2Connection.java:710)
at java.net.http/jdk.internal.net.http.Http2Connection$Http2TubeSubscriber.processQueue(Http2Connection.java:1323)
at java.net.http/jdk.internal.net.http.common.SequentialScheduler$LockingRestartableTask.run(SequentialScheduler.java:205)
at java.net.http/jdk.internal.net.http.common.SequentialScheduler$CompleteRestartableTask.run(SequentialScheduler.java:149)
at java.net.http/jdk.internal.net.http.common.SequentialScheduler$SchedulableTask.run(SequentialScheduler.java:230)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
但是,我可以使用此代码重现您的错误(我首先点击此错误,然后在此处找到您的问题!)
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class TooManyConcurrentStreams2 {
public static void main(String[] args) {
final var singletonHttpClient = newHttpClient();
final var singletonRequest = newRequest();
final var counter = new AtomicInteger();
final var scheduler = Executors.newScheduledThreadPool(2);
scheduler.schedule(scheduler::shutdown, 1, TimeUnit.HOURS);
scheduler.scheduleAtFixedRate(() -> {
final var batchSize = counter.incrementAndGet();
final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(batchSize);
try {
for (int i = 0; i < batchSize; i++) {
responses.add(
singletonHttpClient.sendAsync(
singletonRequest,
BodyHandlers.discarding()
)
);
}
for (CompletableFuture<HttpResponse<Void>> response : responses) {
response.thenAccept(x -> {
});
response.join();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("batchSize = " + batchSize);
}, 0, 500, TimeUnit.MILLISECONDS);
}
public static HttpRequest newRequest() {
return HttpRequest.newBuilder()
.uri(Constants.TEST_URI)
.header("Content-Type", Constants.CONTENT_TYPE)
.header("Accept", Constants.CONTENT_TYPE)
.POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
.build();
}
public static HttpClient newHttpClient() {
return HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.build();
}
}
这个人在每500ms可运行的一次我一次执行我的一次:
java.util.concurrent.CompletionException: java.io.IOException: too many concurrent streams
at java.base/java.util.concurrent.CompletableFuture.encodeRelay(CompletableFuture.java:368)
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1189)
at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2309)
at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:453)
at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:341)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: too many concurrent streams
因此,问题不是每秒请求数量,而是其他内容,这似乎是每HTTP连接/客户端的并发开放流的数量。
我们可以通过不是共享所有批次请求的同一http客户端(和请求):
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class TooManyConcurrentStreams2 {
public static void main(String[] args) {
final var counter = new AtomicInteger();
final var scheduler = Executors.newScheduledThreadPool(2);
scheduler.schedule(scheduler::shutdown, 1, TimeUnit.HOURS);
scheduler.scheduleAtFixedRate(() -> {
final var httpClient = newHttpClient();
final var request = newRequest();
final var batchSize = counter.incrementAndGet();
final var responses = new ArrayList<CompletableFuture<HttpResponse<Void>>>(batchSize);
try {
for (int i = 0; i < batchSize; i++) {
responses.add(
httpClient.sendAsync(
request,
BodyHandlers.discarding()
)
);
}
for (CompletableFuture<HttpResponse<Void>> response : responses) {
response.thenAccept(x -> {
});
response.join();
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("batchSize = " + batchSize);
}, 0, 500, TimeUnit.MILLISECONDS);
}
public static HttpRequest newRequest() {
return HttpRequest.newBuilder()
.uri(Constants.TEST_URI)
.header("Content-Type", Constants.CONTENT_TYPE)
.header("Accept", Constants.CONTENT_TYPE)
.POST(HttpRequest.BodyPublishers.ofString(Constants.BODY))
.build();
}
public static HttpClient newHttpClient() {
return HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.build();
}
}
对我来说,这是在第143次尝试中失败的,并使用此错误消息:
java.util.concurrent.CompletionException: java.lang.InternalError: java.net.SocketException: Too many open files
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1159)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.InternalError: java.net.SocketException: Too many open files
at java.net.http/jdk.internal.net.http.PlainHttpConnection.<init>(PlainHttpConnection.java:293)
at java.net.http/jdk.internal.net.http.AsyncSSLConnection.<init>(AsyncSSLConnection.java:49)
at java.net.http/jdk.internal.net.http.HttpConnection.getSSLConnection(HttpConnection.java:293)
at java.net.http/jdk.internal.net.http.HttpConnection.getConnection(HttpConnection.java:279)
at java.net.http/jdk.internal.net.http.Http2Connection.createAsync(Http2Connection.java:369)
at java.net.http/jdk.internal.net.http.Http2ClientImpl.getConnectionFor(Http2ClientImpl.java:128)
at java.net.http/jdk.internal.net.http.ExchangeImpl.get(ExchangeImpl.java:93)
at java.net.http/jdk.internal.net.http.Exchange.establishExchange(Exchange.java:343)
at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl0(Exchange.java:475)
at java.net.http/jdk.internal.net.http.Exchange.responseAsyncImpl(Exchange.java:380)
at java.net.http/jdk.internal.net.http.Exchange.responseAsync(Exchange.java:372)
at java.net.http/jdk.internal.net.http.MultiExchange.responseAsyncImpl(MultiExchange.java:408)
at java.net.http/jdk.internal.net.http.MultiExchange.lambda$responseAsync0$2(MultiExchange.java:341)
at java.base/java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:1150)
... 5 more
这很可能是由于我笔记本电脑相对较低的12544 ulimit
。