在Webflux中优雅关机时letucereactivessubscription中的取消异常 &g



我有一个spring web flux应用程序。我最近通过以下配置在我的应用程序中启用了优雅关机

server:
port: 8080
http2:
enabled: true
compression:
enabled: true
mime-types: text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json
min-response-size: 10240
shutdown: graceful
spring:
data:
redis:
repositories:
enabled: false
lifecycle:
timeout-per-shutdown-phase: 40s
redis:
lettuce:
shutdown-timeout: 10s
cluster:
nodes: 127.0.0.1:30001,127.0.0.1:30002,127.0.0.1:30003,127.0.0.1:30004,127.0.0.1:30005,127.0.0.1:30006


weather:
realtime_weather_redis_topic: realtime_weather_data

我也在我的应用程序中使用Redis作为pub-sub。当正常关闭发生时,ReactiveRedisMessageListenerContainer会出现异常。


/Library/Java/JavaVirtualMachines/adoptopenjdk-16.jdk/Contents/Home/bin/java -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:51119,suspend=y,server=n -javaagent:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlinx/kotlinx-coroutines-core-jvm/1.5.0/d8cebccdcddd029022aa8646a5a953ff88b13ac8/kotlinx-coroutines-core-jvm-1.5.0.jar -Dspring.output.ansi.enabled=always -javaagent:/Users/zakirsaifi/Library/Caches/JetBrains/IntelliJIdea2021.2/captureAgent/debugger-agent.jar -Dfile.encoding=UTF-8 -classpath /Users/zakirsaifi/mydata/Projects/Generic-Object-Mapper/build/classes/kotlin/main:/Users/zakirsaifi/mydata/Projects/Generic-Object-Mapper/build/resources/main:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter-webflux/2.5.1/637400edf0a1ac9478cfc3d3940017b93b4e39a8/spring-boot-starter-webflux-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.module/jackson-module-kotlin/2.12.3/c8da7d998ed2e9253e62fdf15d1112c54b3ce6db/jackson-module-kotlin-2.12.3.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.projectreactor.kotlin/reactor-kotlin-extensions/1.1.3/bb87a66e4a6b19237dfcf1899845f3e346a72b95/reactor-kotlin-extensions-1.1.3.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-reflect/1.5.10/d6c70f3c0df2457ea0095c61c1fc1188017dc3bc/kotlin-reflect-1.5.10.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlinx/kotlinx-coroutines-reactor/1.5.0/de8861693b3746fa695724933ad831f45e60d8ca/kotlinx-coroutines-reactor-1.5.0.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk8/1.5.10/3f4af7aff21c4ec46e3cdd645639d0a63a68d3d0/kotlin-stdlib-jdk8-1.5.10.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter-data-redis-reactive/2.5.1/a28ac2e835d76ed2313d6e8590ede18762749404/spring-boot-starter-data-redis-reactive-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter-json/2.5.1/6caadea880eea629af40bc30163b6be047999b95/spring-boot-starter-json-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter/2.5.1/2d43f64d7016e3a3a3bb5173b30d4cf67dd8c310/spring-boot-starter-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter-reactor-netty/2.5.1/3a80bd98a338af39f2ff9ba766a998d7a583c45d/spring-boot-starter-reactor-netty-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework/spring-webflux/5.3.8/4ebebf6f53ca2d5f75672009a8598462e2e1d279/spring-webflux-5.3.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework/spring-web/5.3.8/daa288e67b0f2e09a033500d5ce8406677c5045c/spring-web-5.3.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-databind/2.12.3/d6153f8fc60c479ab0f9efb35c034526436a4953/jackson-databind-2.12.3.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-annotations/2.12.3/7275513412694a1aafd08c0287f48469fa0e6e17/jackson-annotations-2.12.3.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.projectreactor/reactor-core/3.4.6/d8d52418db9eea651d4772a619d5c9ea820449b7/reactor-core-3.4.6.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib/1.5.10/da6a904b132f0402fa4d79169a3c1770598d4702/kotlin-stdlib-1.5.10.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlinx/kotlinx-coroutines-reactive/1.5.0/4e68a7d021b3a79632f2e746b410b955f0931e05/kotlinx-coroutines-reactive-1.5.0.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-jdk7/1.5.10/c49d0703d16c6cb1526cc07b9b46486da1dd8a60/kotlin-stdlib-jdk7-1.5.10.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.github.microutils/kotlin-logging-jvm/2.0.8/f86b95ae10ee142b5b3f3bc52e372acb3f8ebdb3/kotlin-logging-jvm-2.0.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter-data-redis/2.5.1/bea4ef1748104f971e718e4e4ae64dfdde8a1f5f/spring-boot-starter-data-redis-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.datatype/jackson-datatype-jdk8/2.12.3/77424ea087313312e308dae5ff8445608aabb5e1/jackson-datatype-jdk8-2.12.3.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.datatype/jackson-datatype-jsr310/2.12.3/f69c636438dcf19c49960c1fe8901320ab85f989/jackson-datatype-jsr310-2.12.3.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.module/jackson-module-parameter-names/2.12.3/592a882beaf1bd57b8fe960b937a2706b090b4d7/jackson-module-parameter-names-2.12.3.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-autoconfigure/2.5.1/d5f3dff4ddc3bde2dcd38f410567e0f2c28d8ba7/spring-boot-autoconfigure-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot/2.5.1/eb046cc9d226cb8ab131283c526d132ba71ed39c/spring-boot-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.boot/spring-boot-starter-logging/2.5.1/2ea28fc306f97c054bee83a8ae5f787c66852ade/spring-boot-starter-logging-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/jakarta.annotation/jakarta.annotation-api/1.3.5/59eb84ee0d616332ff44aba065f3888cf002cd2d/jakarta.annotation-api-1.3.5.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework/spring-core/5.3.8/da9b87dacaa5bbf80fad0f7b483988372a00a152/spring-core-5.3.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.yaml/snakeyaml/1.28/7cae037c3014350c923776548e71c9feb7a69259/snakeyaml-1.28.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.projectreactor.netty/reactor-netty-http/1.0.7/95106e62dce6e83933823bc54f4177c2a60c8028/reactor-netty-http-1.0.7.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework/spring-beans/5.3.8/3d66fed1eebfcd119efcabc6218c813700a21ed/spring-beans-5.3.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/com.fasterxml.jackson.core/jackson-core/2.12.3/deb23fe2a7f2b773e18ced2b50d4acc1df8fa366/jackson-core-2.12.3.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.reactivestreams/reactive-streams/1.0.3/d9fb7a7926ffa635b3dcaa5049fb2bfa25b3e7d0/reactive-streams-1.0.3.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.jetbrains/annotations/13.0/919f0dfe192fb4e063e7dacadee7f8bb9a2672a9/annotations-13.0.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlin/kotlin-stdlib-common/1.5.10/6b84d926e28493be69daf673e40076f89492ef7/kotlin-stdlib-common-1.5.10.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.jetbrains.kotlinx/kotlinx-coroutines-core-jvm/1.5.0/d8cebccdcddd029022aa8646a5a953ff88b13ac8/kotlinx-coroutines-core-jvm-1.5.0.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.30/b5a4b6d16ab13e34a88fae84c35cd5d68cac922c/slf4j-api-1.7.30.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.lettuce/lettuce-core/6.1.2.RELEASE/6799c1d7b8e459b7c01dd1cadb7443d2869569b8/lettuce-core-6.1.2.RELEASE.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.data/spring-data-redis/2.5.1/f31408dde7d7c7197db287aec00cf01c5754aa0d/spring-data-redis-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework/spring-context/5.3.8/c367a05423e963c222e38a6a88b97d44de3880ca/spring-context-5.3.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/ch.qos.logback/logback-classic/1.2.3/7c4f3c474fb2c041d8028740440937705ebb473a/logback-classic-1.2.3.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-to-slf4j/2.14.1/ce8a86a3f50a4304749828ce68e7478cafbc8039/log4j-to-slf4j-2.14.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.slf4j/jul-to-slf4j/1.7.30/d58bebff8cbf70ff52b59208586095f467656c30/jul-to-slf4j-1.7.30.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework/spring-jcl/5.3.8/a143c8618eb2be8674c3cf132d9a5c953bb5488/spring-jcl-5.3.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.projectreactor.netty/reactor-netty-core/1.0.7/f5819084c49d6651a3be5805c5e128e535bd8eb4/reactor-netty-core-1.0.7.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec-http2/4.1.65.Final/b98d8554d43bbf379883005d043484bafd47cf3/netty-codec-http2-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec-http/4.1.65.Final/fdf0886e1a615ab4ccea96830d7e663a4349180/netty-codec-http-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-resolver-dns-native-macos/4.1.65.Final/47c780789fbb6e5ff085ce4258e99cbd768a117b/netty-resolver-dns-native-macos-4.1.65.Final-osx-x86_64.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-resolver-dns/4.1.65.Final/e8bd69d605d08b2257c2ee17cb959cc54302484e/netty-resolver-dns-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-transport-native-epoll/4.1.65.Final/ed1ce93ceaaec02c7d3d88daf82360745499747/netty-transport-native-epoll-4.1.65.Final-linux-x86_64.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-handler/4.1.65.Final/28313a14ddd2fc312f75e8f21a5a12ffac4ef0b6/netty-handler-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-transport/4.1.65.Final/14c0c0356af101fb7f6e2ba77b3e68d7b21dc37b/netty-transport-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-common/4.1.65.Final/3e5ad41e33add0fad197942e38a509f868aa2c5a/netty-common-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.data/spring-data-keyvalue/2.5.1/91560471e3b1aa21b96f1a50e32ca914c480e303/spring-data-keyvalue-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework/spring-context-support/5.3.8/4be8e4dab3197ea0232fbe146635385093049bad/spring-context-support-5.3.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework/spring-tx/5.3.8/3202859a7e9560110f1fe8284c28feb009c6f460/spring-tx-5.3.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework/spring-oxm/5.3.8/a8ce0994ae2f5a322fd453ee82fc81e0072258f2/spring-oxm-5.3.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework/spring-aop/5.3.8/1377f80f938b1fc7eabe9e6c4f6895e77e3bec40/spring-aop-5.3.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework/spring-expression/5.3.8/8a14547b76cbae3aeb02739e5b38e71835a6bbd8/spring-expression-5.3.8.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/ch.qos.logback/logback-core/1.2.3/864344400c3d4d92dfeb0a305dc87d953677c03c/logback-core-1.2.3.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.apache.logging.log4j/log4j-api/2.14.1/cd8858fbbde69f46bce8db1152c18a43328aae78/log4j-api-2.14.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-handler-proxy/4.1.65.Final/494094032106f27cb19f67a2ce88570d974438de/netty-handler-proxy-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec/4.1.65.Final/4a171d689d44df38d1b5b09cedee8584f858e702/netty-codec-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-buffer/4.1.65.Final/9654d248812add4aa36a6201fab88ac5f13028c3/netty-buffer-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-transport-native-unix-common/4.1.65.Final/6aea23759e5325309028ed239885fcd5a9075c66/netty-transport-native-unix-common-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec-dns/4.1.65.Final/6aa72620a8cdd7c9414f45a0c9439fdcab060350/netty-codec-dns-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-resolver/4.1.65.Final/561ff144d7ab425d74a2c54e8fb78865a6986063/netty-resolver-4.1.65.Final.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/org.springframework.data/spring-data-commons/2.5.1/c950ca1a05e928e9fb75420b4ac07713428e9969/spring-data-commons-2.5.1.jar:/Users/zakirsaifi/.gradle/caches/modules-2/files-2.1/io.netty/netty-codec-socks/4.1.65.Final/6a9107f2be2b2566adf491dc53fd34b0faf9d13d/netty-codec-socks-4.1.65.Final.jar:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar com.reactive.sse.SseServerWebFluxApplicationKt
Connected to the target VM, address: '127.0.0.1:51119', transport: 'socket'
OpenJDK 64-Bit Server VM warning: Sharing is only supported for boot loader classes because bootstrap classpath has been appended
.   ____          _            __ _ _
/\ / ___'_ __ _ _(_)_ __  __ _    
( ( )___ | '_ | '_| | '_ / _` |    
\/  ___)| |_)| | | | | || (_| |  ) ) ) )
'  |____| .__|_| |_|_| |___, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot ::                (v2.5.1)
2021-08-12 12:32:12.044  INFO 13916 --- [           main] c.r.sse.SseServerWebFluxApplicationKt    : Starting SseServerWebFluxApplicationKt using Java 16.0.1 on Zakirs-MacBook-Pro.local with PID 13916 (/Users/zakirsaifi/mydata/Projects/Generic-Object-Mapper/build/classes/kotlin/main started by zakirsaifi in /Users/zakirsaifi/mydata/Projects/Generic-Object-Mapper)
2021-08-12 12:32:12.046  INFO 13916 --- [           main] c.r.sse.SseServerWebFluxApplicationKt    : No active profile set, falling back to default profiles: default
2021-08-12 12:32:16.255  INFO 13916 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port 8080
2021-08-12 12:32:16.274  INFO 13916 --- [           main] c.r.sse.SseServerWebFluxApplicationKt    : Started SseServerWebFluxApplicationKt in 10.088 seconds (JVM running for 11.045)
Disconnected from the target VM, address: '127.0.0.1:51119', transport: 'socket'
2021-08-12 12:34:15.333  INFO 13916 --- [ionShutdownHook] o.s.b.w.embedded.netty.GracefulShutdown  : Commencing graceful shutdown. Waiting for active requests to complete
2021-08-12 12:34:15.335  INFO 13916 --- [ netty-shutdown] o.s.b.w.embedded.netty.GracefulShutdown  : Graceful shutdown complete
2021-08-12 12:34:15.347 ERROR 13916 --- [ioEventLoop-5-8] c.r.s.s.WeatherEventRedisConsumer        : Error deserializing WeatherInfo event in redis consumer
java.util.concurrent.CancellationException: Disconnected
at reactor.core.publisher.FluxPublish$PublishSubscriber.disconnectAction(FluxPublish.java:314) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxPublish$PublishSubscriber.dispose(FluxPublish.java:305) ~[reactor-core-3.4.6.jar:3.4.6]
at org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription$State.terminate(LettuceReactiveSubscription.java:288) ~[spring-data-redis-2.5.1.jar:2.5.1]
at org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription.lambda$cancel$6(LettuceReactiveSubscription.java:177) ~[spring-data-redis-2.5.1.jar:2.5.1]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88) ~[reactor-core-3.4.6.jar:3.4.6]
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onComplete(RedisPublisher.java:896) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$State.onAllDataRead(RedisPublisher.java:698) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$State$3.read(RedisPublisher.java:608) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$State$3.onDataAvailable(RedisPublisher.java:565) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$RedisSubscription.onDataAvailable(RedisPublisher.java:326) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$RedisSubscription.onAllDataRead(RedisPublisher.java:341) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:778) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.cluster.ClusterCommand.complete(ClusterCommand.java:65) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.pubsub.PubSubCommandHandler.completeCommand(PubSubCommandHandler.java:260) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.pubsub.PubSubCommandHandler.notifyPushListeners(PubSubCommandHandler.java:220) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:646) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.pubsub.PubSubCommandHandler.decode(PubSubCommandHandler.java:112) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:598) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]
2021-08-12 12:34:15.348 ERROR 13916 --- [ioEventLoop-5-8] reactor.core.publisher.Operators         : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.util.concurrent.CancellationException: Disconnected
Caused by: java.util.concurrent.CancellationException: Disconnected
at reactor.core.publisher.FluxPublish$PublishSubscriber.disconnectAction(FluxPublish.java:314) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.FluxPublish$PublishSubscriber.dispose(FluxPublish.java:305) ~[reactor-core-3.4.6.jar:3.4.6]
at org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription$State.terminate(LettuceReactiveSubscription.java:288) ~[spring-data-redis-2.5.1.jar:2.5.1]
at org.springframework.data.redis.connection.lettuce.LettuceReactiveSubscription.lambda$cancel$6(LettuceReactiveSubscription.java:177) ~[spring-data-redis-2.5.1.jar:2.5.1]
at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:236) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2057) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onComplete(MonoPeekTerminal.java:299) ~[reactor-core-3.4.6.jar:3.4.6]
at reactor.core.publisher.MonoIgnoreElements$IgnoreElementsSubscriber.onComplete(MonoIgnoreElements.java:88) ~[reactor-core-3.4.6.jar:3.4.6]
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onComplete(RedisPublisher.java:896) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$State.onAllDataRead(RedisPublisher.java:698) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$State$3.read(RedisPublisher.java:608) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$State$3.onDataAvailable(RedisPublisher.java:565) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$RedisSubscription.onDataAvailable(RedisPublisher.java:326) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$RedisSubscription.onAllDataRead(RedisPublisher.java:341) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.RedisPublisher$SubscriptionCommand.doOnComplete(RedisPublisher.java:778) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:65) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.protocol.CommandWrapper.complete(CommandWrapper.java:63) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.cluster.ClusterCommand.complete(ClusterCommand.java:65) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.pubsub.PubSubCommandHandler.completeCommand(PubSubCommandHandler.java:260) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.pubsub.PubSubCommandHandler.notifyPushListeners(PubSubCommandHandler.java:220) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:646) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.pubsub.PubSubCommandHandler.decode(PubSubCommandHandler.java:112) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:598) ~[lettuce-core-6.1.2.RELEASE.jar:6.1.2.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.65.Final.jar:4.1.65.Final]
at java.base/java.lang.Thread.run(Thread.java:831) ~[na:na]

Process finished with exit code 130 (interrupted by signal 2: SIGINT)

在正常关机的情况下,不应该出现这种异常。我怎样才能阻止这一切。

1。复述,配置


@Configuration
class RedisConfig {
@Value("${weather.realtime_weather_redis_topic}")
lateinit var realtimeWeatherRedisTopic: String
@Bean
fun weatherRedisTopic(): ChannelTopic = ChannelTopic(realtimeWeatherRedisTopic)
@Bean
fun redisContainer(factory: ReactiveRedisConnectionFactory): ReactiveRedisMessageListenerContainer {
return ReactiveRedisMessageListenerContainer(factory)
}
}

2。复述,消费者


@Service
class WeatherEventRedisConsumer(
private val reactiveMsgListenerContainer: ReactiveRedisMessageListenerContainer,
private val weatherDataRedisTopic: ChannelTopic,
private val mapper: ObjectMapper
) {
private val logger = KotlinLogging.logger {}

lateinit var listener: WebsocketEventListenerInterface

fun register(listener: WebsocketEventListenerInterface?) {
this.listener = listener!!
}

fun onEvent(event: WeatherInfoEvent?) {
if (this::listener.isInitialized) {
listener.onData(event)
}
}

fun onComplete() {
listener.processComplete()
}

@PostConstruct
private fun consumerInit() {
this.reactiveMsgListenerContainer.receive(weatherDataRedisTopic)
.map(ReactiveSubscription.Message<String, String>::getMessage)
.map { msg -> mapper.deserialize<WeatherInfoEvent>(msg) }
.doOnError { e -> logger.error(e) { "Error deserializing WeatherInfo event in redis consumer" } }
.subscribe { tick: WeatherInfoEvent -> onEvent(tick) }
}
}

所有代码都在这里https://github.com/Zaky7/Generic-Object-Mapper

你期望发生什么事?

ReactiveRedisMessageListenerContainer.destroy()取消所有活动订阅,CancellationException是您从取消活动订阅中看到的信号。

使用subscribe { tick: WeatherInfoEvent -> onEvent(tick) }有点不幸,因为您没有消耗错误信号,这就是为什么您看到ErrorCallbackNotImplemented

最新更新