创建订阅者以从pubSub Topic中同步提取消息。我使用了文档示例中的确切设置。
然而,当我试图关闭订阅者并创建新订阅者时,我面临以下问题:
[pool-1-thread-8] ERROR i.g.i.ManagedChannelOrphanWrapper - *~*~*~ Channel ManagedChannelImpl{logId=47567, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>
(ManagedChannelOrphanWrapper.java:93)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:53)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:44)
at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:612)
at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.
java:261) at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel
(InstantiatingGrpcChannelProvider.java:340)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.access$1600
(InstantiatingGrpcChannelProvider.java:73)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider$1.createSingleChannel
(InstantiatingGrpcChannelProvider.java:214)
at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:72)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.
createChannel(InstantiatingGrpcChannelProvider.java:221)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider
.getTransportChannel(InstantiatingGrpcChannelProvider.java:204)
at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:169)
at com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub.create(GrpcSubscriberStub.java:272)
at eu.hermes.esb.cloud.service.SubService.getSubscriber(SubService.java:48)
at eu.hermes.esb.cloud.runnables.SubTask.run(SubTask.java:31)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
创建和关闭订阅者的代码:
while (compositeConfigurationElement.getSubscriber().isEnabled()) {
SubscriberStub subscriber;
try {
subscriber = subService.getSubscriber(compositeConfigurationElement);
subService.pullAndSend(compositeConfigurationElement, subscriptionName, subscriber);
subscriber.shutdown();
subscriber.awaitTermination(20, TimeUnit.SECONDS);
} catch (DeadlineExceededException e) {
// this is a know issue at the moment ->
// 1- https://github.com/googleapis/google-cloud-java/issues/4220
// 2- https://stackoverflow.com/questions/60012138/google-cloud-function-pulling-from-pub-sub-subscription-throws-exception-deadl
// 3- https://github.com/googleapis/google-cloud-java/issues/3648
log.warn("Deadline exceeded for subscription");
} catch (Exception e) {
log.error("Exception with the subscription service", e);
try {
TimeUnit.SECONDS.sleep(10);
} catch (Exception e2) {
log.error("Exception during wait", e2);
}
}
}
so,如何摆脱这个错误信息? 和虽然我正在捕捉所有异常,但这个异常究竟是如何抛出的?
我使用google-cloud-pubsub:1.110.3
所以,我错过了,当一个空订阅发生时,拉取请求抛出一个我已经捕获的DeadlineExceededException
,但我错过了,在这种情况下订阅者没有关闭。
所以解决方案是在catch或finally块中关闭订阅者。