OkHttpAsyncHttpClient 抛出 IOException:关闭从 Azure 容器下载 blob



我无法使用 azure-sdk-for-java 从 azure 下载 blob。OkHttpAsyncHttpClient 抛出 IOException:在读取 InputStream 时关闭。我不知道关闭是从哪里来的。 azure-storage-blob 版本是 12.1.0,azure-core-http-okhttp 是 1.1.0,使用 com.squareup.okhttp3.okhttp 版本 4.2.2

BlobServiceClient blobServiceClient = new BlobServiceClientBuilder()
.connectionString(getConnectionString())
.httpClient(new OkHttpAsyncHttpClientBuilder().build())
.buildClient();
BlobContainerClient containerClient = blobServiceClient.getBlobContainerClient(getContainerName());
for (BlobItem blobItem : containerClient.listBlobs()) {
String fileName = blobItem.getName();
BlobClient blobClient2 = containerClient.getBlobClient(fileName);      
blobClient2.downloadToFile(fileName);  
blobClient2.delete();
}

例外:

2020-02-11 15:54:00,928 | INFO  | -pubblobsdk_Worker-2 | o.q.c.JobRunShell                | 715 - org.quartz-scheduler.quartz - 2.3.0 | Job myGroup.us_gov_dod_af_cce_mm_pubblobsdk_route2 threw a JobExecutionException: 
org.quartz.JobExecutionException: reactor.core.Exceptions$ReactiveException: java.io.IOException: closed
at org.apache.camel.component.quartz2.CamelJob.execute(CamelJob.java:61) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.quartz.core.JobRunShell.run(JobRunShell.java:202) [!/:?]
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) [!/:?]
Caused by: reactor.core.Exceptions$ReactiveException: java.io.IOException: closed
at reactor.core.Exceptions.propagate(Exceptions.java:336) ~[!/:3.3.0.RELEASE]
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:91) ~[!/:3.3.0.RELEASE]
at reactor.core.publisher.Mono.block(Mono.java:1663) ~[!/:3.3.0.RELEASE]
at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:94) ~[!/:?]
at com.azure.storage.blob.specialized.BlobClientBase.downloadToFileWithResponse(BlobClientBase.java:481) ~[!/:?]
at com.azure.storage.blob.specialized.BlobClientBase.downloadToFile(BlobClientBase.java:442) ~[!/:?]
at mil.af.cce2.mm.templates.azure.lib.AzureLibBean.downloadBlobFromContainer(AzureLibBean.java:480) ~[!/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:187) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.loadbalancer.QueueLoadBalancer.process(QueueLoadBalancer.java:44) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.loadbalancer.LoadBalancerSupport.process(LoadBalancerSupport.java:97) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.component.quartz2.CamelJob.execute(CamelJob.java:58) ~[!/:2.21.0.fuse-750033-redhat-00001]
... 2 more
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:93) ~[!/:3.3.0.RELEASE]
at reactor.core.publisher.Mono.block(Mono.java:1663) ~[!/:3.3.0.RELEASE]
at com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:94) ~[!/:?]
at com.azure.storage.blob.specialized.BlobClientBase.downloadToFileWithResponse(BlobClientBase.java:481) ~[!/:?]
at com.azure.storage.blob.specialized.BlobClientBase.downloadToFile(BlobClientBase.java:442) ~[!/:?]
at mil.af.cce2.mm.templates.azure.lib.AzureLibBean.downloadBlobFromContainer(AzureLibBean.java:480) ~[!/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_212]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_212]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_212]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_212]
at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:187) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:138) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:101) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.loadbalancer.QueueLoadBalancer.process(QueueLoadBalancer.java:44) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.processor.loadbalancer.LoadBalancerSupport.process(LoadBalancerSupport.java:97) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.apache.camel.component.quartz2.CamelJob.execute(CamelJob.java:58) ~[!/:2.21.0.fuse-750033-redhat-00001]
at org.quartz.core.JobRunShell.run(JobRunShell.java:202) [!/:?]
at org.quartz.simpl.SimpleThreadPool$WorkerThread.run(SimpleThreadPool.java:573) [!/:?]
Caused by: java.io.IOException: closed
at okio.RealBufferedSource$inputStream$1.read(RealBufferedSource.kt:434) ~[?:?]
at java.io.InputStream.read(InputStream.java:101) ~[?:1.8.0_212]
at com.azure.core.http.okhttp.OkHttpAsyncHttpClient$OkHttpResponse.lambda$toFluxByteBuffer$6(OkHttpAsyncHttpClient.java:293) ~[?:?]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:100) ~[?:?]
at reactor.core.publisher.FluxRepeatPredicate$RepeatPredicateSubscriber.onNext(FluxRepeatPredicate.java:79) ~[?:?]
at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:99) ~[?:?]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:1920) ~[?:?]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155) ~[?:?]
at reactor.core.publisher.FluxTakeUntil$TakeUntilPredicateSubscriber.request(FluxTakeUntil.java:133) ~[?:?]
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:179) ~[?:?]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155) ~[?:?]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyMain.request(MonoFlatMapMany.java:105) ~[?:?]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:1920) ~[?:?]
at reactor.core.publisher.StrictSubscriber.request(StrictSubscriber.java:138) ~[?:?]
at com.azure.core.util.FluxUtil$1$1.completed(FluxUtil.java:257) ~[?:?]
at com.azure.core.util.FluxUtil$1$1.completed(FluxUtil.java:248) ~[?:?]
at sun.nio.ch.Invoker.invokeUnchecked(Invoker.java:126) ~[?:1.8.0_212]
at sun.nio.ch.SimpleAsynchronousFileChannelImpl$3.run(SimpleAsynchronousFileChannelImpl.java:389) ~[?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_212]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_212]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_212]

这似乎是一个错误。

通过检查源代码,如果不添加 maven 依赖项azure-core-http-okhttp它将使用该defaultProvider来创建 HttpClient 实例。

public final class HttpClientProviders {
private static HttpClientProvider defaultProvider;
private static final String CANNOT_FIND_HTTP_CLIENT =
"Cannot find any HttpClient provider on the classpath - unable to create a default HttpClient instance";
static {
ServiceLoader<HttpClientProvider> serviceLoader = ServiceLoader.load(HttpClientProvider.class);
// Use the first provider found in the service loader iterator.
Iterator<HttpClientProvider> it = serviceLoader.iterator();
if (it.hasNext()) {
defaultProvider = it.next();
}
}
private HttpClientProviders() {
// no-op
}
public static HttpClient createInstance() {
if (defaultProvider == null) {
throw new IllegalStateException(CANNOT_FIND_HTTP_CLIENT);
}
return defaultProvider.createInstance();
}
}

并且,默认情况下,使用azure-core-http-netty。其中有一个ReactorNettyClientProvider

public class ReactorNettyClientProvider implements HttpClientProvider {
@Override
public HttpClient createInstance() {
return new NettyAsyncHttpClientBuilder().build();
}
}

因此,如果不指定 httpclient,BlobServiceClient将使用NettyAsyncHttpClient。一切都很好。

但是,如果azure-core-http-okhttp手动添加到依赖项中,并且您没有在 BlobServiceClientBuilder 中指定 httpclient。然后OkHttpClientProvider将被使用。

public class OkHttpClientProvider implements HttpClientProvider {
@Override
public HttpClient createInstance() {
return new OkHttpAsyncHttpClientBuilder().build();
}
}

然后发生了有趣的事情,您将获得与发布相同的错误。所以,我想OkHttpAsyncHttpClient实现一定有问题。


溶液

  1. 更改为使用 azure-core-http-netty

  2. 更改为使用 Azure 存储客户端 SDK Microsoft

示例:

public static void main(String[] args) throws Exception {
String connectionString = "DefaultEndpointsProtocol=https;AccountName=storagetest789;AccountKey=G36mc*******j1w==;EndpointSuffix=core.windows.net";
StorageCredentials credentials = StorageCredentials.tryParseCredentials(connectionString);
// Set proxy
//OperationContext.setDefaultProxy(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("127.0.0.1",8888)));
CloudStorageAccount storageAccount = new CloudStorageAccount(credentials, true);
CloudBlobClient blobClient = storageAccount.createCloudBlobClient();
CloudBlobContainer blobContainer = blobClient.getContainerReference("function");
blobContainer.createIfNotExists();
//        for (ListBlobItem blobItem : blobContainer.listBlobs()) {
//            System.out.println(blobItem.getUri());
//            CloudBlockBlob blockBlob = new CloudBlockBlob(blobItem.getUri(), credentials);
//            blockBlob.downloadToFile("d:\test\"+blockBlob.getName());
//        }
for (ListBlobItem blobItem : blobContainer.listBlobs()) {
String[] parts = blobItem.getUri().toString().split("/");
CloudBlockBlob blockBlob = blobContainer.getBlockBlobReference(parts[parts.length - 1]);
blockBlob.downloadToFile("d:/test/" + blockBlob.getName());
}
}

最新更新