从Java reactor上下文创建/重新创建Azure CosmosDb异步客户端



由于公司的政策,我们的CosmosDb密钥会随着Azure KeyVault中的新密钥定期轮换,因此我们需要在Java weblfux/reactive应用程序中处理这个不断变化的密钥。从Java SDK中,一旦从主密钥创建了CosmosAsyncClient,就无法更改它,我们需要用大致以下代码重建它:

SecretAsyncClient secretAsyncClient = new SecretClientBuilder().buildAsyncClient()
...
Mono<CosmosAsyncClient> client = secretAsyncClient.getSecret(KEY_NAME).map(
s -> s.getValue()
).map(
key -> new CosmosClientBuilder()
.endpoint(HOST)
.key(key)
.buildAsyncClient()
);
return client.flatMap(
...
);

从日志中看,getSecret()还可以,但后来我得到了:

05:25:00.597 INFO  c.a.c.i.RxDocumentClientImpl - Initializing DocumentClient [4] with serviceEndpoint [https://xxxx-cosmosdb-sql-dev.documents.azure.com:443/], connectionPolicy [ConnectionPolicy{httpNetworkRequestTimeout=PT1M, tcpNetworkRequestTimeout=PT5S, connectionMode=DIRECT, maxConnectionPoolSize=1000, idleHttpConnectionTimeout=PT1M, idleTcpConnectionTimeout=PT0S, userAgentSuffix='', throttlingRetryOptions=RetryOptions{maxRetryAttemptsOnThrottledRequests=9, maxRetryWaitTime=PT30S}, endpointDiscoveryEnabled=true, preferredRegions=[Switzerland North], multipleWriteRegionsEnabled=true, proxyType=null, inetSocketProxyAddress=null, readRequestsFallbackEnabled=true, connectTimeout=PT5S, idleTcpEndpointTimeout=PT1H, maxConnectionsPerEndpoint=130, maxRequestsPerConnection=30, tcpConnectionEndpointRediscoveryEnabled=true}], consistencyLevel [Session], directModeProtocol [Tcp]05:25:00.598 ERROR c.a.c.i.RxDocumentClientImpl - unexpected failure in initializing client.
java.lang.RuntimeException: java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-epoll-3
at com.azure.cosmos.implementation.http.ReactorNettyClient.attemptToWarmupHttpClient(ReactorNettyClient.java:118)
at com.azure.cosmos.implementation.http.ReactorNettyClient.createWithConnectionProvider(ReactorNettyClient.java:98)
at com.azure.cosmos.implementation.http.HttpClient.createFixed(HttpClient.java:61)
at com.azure.cosmos.implementation.RxDocumentClientImpl.httpClient(RxDocumentClientImpl.java:617)
at com.azure.cosmos.implementation.RxDocumentClientImpl.<init>(RxDocumentClientImpl.java:410)
at com.azure.cosmos.implementation.RxDocumentClientImpl.<init>(RxDocumentClientImpl.java:262)
at com.azure.cosmos.implementation.RxDocumentClientImpl.<init>(RxDocumentClientImpl.java:230)
at com.azure.cosmos.implementation.AsyncDocumentClient$Builder.build(AsyncDocumentClient.java:243)
at com.azure.cosmos.CosmosAsyncClient.<init>(CosmosAsyncClient.java:129)
at com.azure.cosmos.CosmosClientBuilder.buildAsyncClient(CosmosClientBuilder.java:779)

所以我似乎需要将CosmosClientBuilder().buildAsyncClient()部分放入执行器池?有没有更简单或更优雅的方法可以做到这一点?

我对源代码进行了简短的挖掘,错误消息是正确的。CosmosClientAsync中存在阻塞调用。

这个库做了一些非常丑陋的事情,他们在NettyHttpClient中的warmup函数上进行动态查找,然后调用它。

Netty HttpClient上的预热功能(根据文档(执行以下操作:

根据实际配置,返回一个触发的Mono

  • 事件循环组的初始化
  • 主机名解析程序的初始化
  • 加载传输所需的本机库

默认情况下,当不使用方法时,连接操作会占用初始化和加载资源所需的额外时间。

在cosmos库中,这个预热调用似乎是不可配置的。

我想我真的会在他们的github上打开一个问题,问他们这件事。

最新更新