block()/blockFirst()/bockLast()是阻塞,线程reactor-http-nio-3不支持这种



I使用弹簧弹簧引导2.1.0。

public Mono<ServerResponse> getConvertXmlToJson(ServerRequest serverRequest) {
Mono<String> requestString = serverRequest.bodyToMono(String.class);
Mono<String> response = this.convertFormatService.convertXmlToJson(requestString);
return response.subscribeOn(elastic()).flatMap(rta -> ok()
.contentType(APPLICATION_JSON_UTF8)
.body(fromObject(rta)))
.switchIfEmpty(ServerResponse.badRequest().build());
}

在其他bean 中调用此方法

private Mono<String> converter(String function, Mono<String> stringToConverter, String setting) {
log.info("salida, {}", stringToConverter.block());
try {
Object result = invocable.invokeFunction(function, stringToConverter, setting);
log.info("{}", result);
return Mono.just((String) result);
} catch (Exception e) {
throw new ConverterException(e.getMessage());
}
}

我需要传递一个字符串值而不是Mono,当我传递Mono时,答案是空的。

当我试图从Mono中获取字符串值时,我会收到以下错误:

{
"timestamp": "2018-11-13T03:29:02.161+0000",
"path": "/v1/convert",
"status": 500,
"error": "Internal Server Error",
"message": "block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3",
"trace": "java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3rntat reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:77)rntat reactor.core.publisher.Mono.block(Mono.java:1475)rntat lf.co.com.examplespringconverterxmljson.services.impl.ConvertFormatServiceImpl.converter(ConvertFormatServiceImpl.java:58)rntat lf.co.com.examplespringconverterxmljson.services.impl.ConvertFormatServiceImpl.convertXmlToJson(ConvertFormatServiceImpl.java:49)rntat lf.co.com.examplespringconverterxmljson.routing.handler.ConvertHandler.getConvertXmlToJson(ConvertHandler.java:25)rntat org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter.handle(HandlerFunctionAdapter.java:61)rntat org.springframework.web.reactive.DispatcherHandler.invokeHandler(DispatcherHandler.java:159)rntat org.springframework.web.reactive.DispatcherHandler.lambda$handle$1(DispatcherHandler.java:151)rntat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:118)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)rntat reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2041)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:458)rntat reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:155)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:458)rntat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1849)rntat reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1723)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:442)rntat reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:86)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:442)rntat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)rntat reactor.core.publisher.MonoCallableOnAssembly.subscribe(MonoCallableOnAssembly.java:82)rntat reactor.core.publisher.MonoMap.subscribe(MonoMap.java:55)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:74)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.Mono.subscribe(Mono.java:3590)rntat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:442)rntat reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:212)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:442)rntat reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:139)rntat reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:63)rntat reactor.core.publisher.FluxOnAssembly.subscribe(FluxOnAssembly.java:164)rntat reactor.core.publisher.FluxConcatMap.subscribe(FluxConcatMap.java:121)rntat reactor.core.publisher.FluxOnAssembly.subscribe(FluxOnAssembly.java:164)rntat reactor.core.publisher.MonoNext.subscribe(MonoNext.java:40)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)rntat reactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onNext(FluxOnAssembly.java:373)rntat reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2041)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.request(FluxOnAssembly.java:458)rntat reactor.core.publisher.MonoProcessor.onSubscribe(MonoProcessor.java:399)rntat reactor.core.publisher.FluxOnAssembly$OnAssemblySubscriber.onSubscribe(FluxOnAssembly.java:442)rntat reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)rntat reactor.core.publisher.MonoCallableOnAssembly.subscribe(MonoCallableOnAssembly.java:82)rntat reactor.core.publisher.Mono.subscribe(Mono.java:3590)rntat reactor.core.publisher.MonoProcessor.add(MonoProcessor.java:531)rntat reactor.core.publisher.MonoProcessor.subscribe(MonoProcessor.java:444)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:71)rntat reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.core.publisher.Mono.subscribe(Mono.java:3590)rntat reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172)rntat reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:71)rntat reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:70)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:71)rntat reactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61)rntat reactor.core.publisher.MonoOnAssembly.subscribe(MonoOnAssembly.java:76)rntat reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:64)rntat reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:226)rntat reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:431)rntat reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)rntat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)rntat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)rntat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)rntat reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:159)rntat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)rntat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)rntat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)rntat io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)rntat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)rntat io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)rntat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:413)rntat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)rntat io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)rntat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)rntat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)rntat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)rntat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)rntat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)rntat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)rntat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)rntat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)rntat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:628)rntat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:563)rntat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)rntat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)rntat io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)rntat java.lang.Thread.run(Thread.java:748)rntSuppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: nAssembly trace from producer [reactor.core.publisher.MonoFlatMap] :ntreactor.core.publisher.Mono.flatMap(Mono.java:2454)ntorg.springframework.web.reactive.DispatcherHandler.handle(DispatcherHandler.java:151)ntorg.springframework.web.server.handler.DefaultWebFilterChain.lambda$filter$0(DefaultWebFilterChain.java:122)ntreactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44)ntreactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)ntreactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)ntreactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1476)ntreactor.core.publisher.MonoProcessor.onNext(MonoProcessor.java:389)ntreactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2041)ntreactor.core.publisher.MonoProcessor.onSubscribe(MonoProcessor.java:399)ntreactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)ntreactor.core.publisher.Mono.subscribe(Mono.java:3590)ntreactor.core.publisher.MonoProcessor.add(MonoProcessor.java:531)ntreactor.core.publisher.MonoProcessor.subscribe(MonoProcessor.java:444)ntreactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)ntreactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)ntreactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)ntreactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)ntreactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)ntreactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61)ntreactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)ntreactor.core.publisher.Mono.subscribe(Mono.java:3590)ntreactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172)ntreactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56)ntreactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java:70)ntreactor.core.publisher.MonoPeekTerminal.subscribe(MonoPeekTerminal.java:61)ntreactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:64)ntreactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:226)ntreactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:431)ntreactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)ntio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)ntio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)ntio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)ntreactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:159)ntio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)ntio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)ntio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)ntio.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)ntio.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)ntio.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)ntio.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:413)ntio.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)ntio.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)ntio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)ntio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)ntio.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)ntio.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1434)ntio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)ntio.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)ntio.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:965)ntio.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163)ntio.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:628)ntio.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:563)ntio.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)ntio.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)ntio.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884)nError has been observed by the following operator(s):nt|_tMono.flatMap ⇢ org.springframework.web.reactive.DispatcherHandler.handle(DispatcherHandler.java:151)nt|_tMono.flatMap ⇢ org.springframework.web.reactive.DispatcherHandler.handle(DispatcherHandler.java:152)nt|_tMono.defer ⇢ org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119)nt|_tMono.flatMap ⇢ org.springframework.web.filter.reactive.HiddenHttpMethodFilter.filter(HiddenHttpMethodFilter.java:90)nt|_tMono.defer ⇢ org.springframework.web.server.handler.DefaultWebFilterChain.filter(DefaultWebFilterChain.java:119)nrn"
}

如果有人有解决方案,我会很感激

也许,更改转换器方法(和convertXmlToJson(以采用String而不是Mono<String>

然后你可以做这样的事情:

Mono<String> response = requestString.flatMap(s -> this.convertFormatService.convertXmlToJson(s));

我认为可能还有更多的清理工作可以做,但希望这能让你更接近。

最新更新