Spring Reactive WebFlux在使用application/stream+json时报告空流量



我有一个反应式核心WebClient要发布到给定的端点。有效载荷是JsonNodes的流量,内容类型为application/stream+json

JsonNode response = localEP.post().uri( "/createItem" )
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body( BodyInserters.fromPublisher(itemData, JsonNode.class ))
.retrieve()
.bodyToMono( JsonNode.class )
.block();

在服务器端,我尝试了Spring Controller风格和Spring Web Reactive FunctionHandler,以使用Flux的有效负载来处理上述调用的有效负载。

@PostMapping(path = "/dev/jobad/dynamo", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
@ResponseStatus(HttpStatus.CREATED)
public Flux<JsonNode> loadItems (@RequestBody Flux<JsonNode> items) {
items.subscribe(storage::add);
JsonNode response = new ObjectMapper().createObjectNode().put( "shady", "shade" );
return Flux.just( response );
}

返回到客户端总是可以的,但是服务器报告流量的内容是空的。如果我将(@RequestBody Flux<JsonNode> items更改为(@RequestBody JsonNode items,则有效载荷接收良好。WebClient日志似乎表明它已将数据写入连线并处理了响应。然而,机身似乎清空了

Reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.web.server.ServerWebInputException: Response status 400 with reason "Request body is missing: public reactor.core.publisher.Flux<com.fasterxml.jackson.databind.JsonNode> com.talroo.rest.JobResource.loadJobs(reactor.core.publisher.Flux<com.fasterxml.jackson.databind.JsonNode>)"
Caused by: org.springframework.web.server.ServerWebInputException: 
Response status 400 with reason "Request body is missing: public 
reactor.core.publisher.Flux<com.fasterxml.jackson.databind.JsonNode>

我需要做什么才能以Flux的身份处理帖子的请求主体?

首先,我不认为Spring正式支持直接从控制器读取/写入JacksonJsonNode实例。您的应用程序应该请求一个域对象或类似Map<String, String>的东西。

现在,在Jackson的模型中,JsonNode表示JSON树中的任何节点——因为它是一棵树,所以你可以期望得到节点的Flux,但你显然能够得到根节点——这解释了你所看到的行为。

因此,我认为您的应用程序应该依赖于更高级别的类,并让Jackson为您反序列化它们。

请注意,您的控制器实现也违反了一些规则:

  • 您不应该在返回反应类型的方法中调用阻塞运算符,如block(您的控制器不会破坏此类型,而是关闭(
  • 您不应该破坏反应管道,并将请求的读取和响应的写入解耦;在您的控制器有机会读取整个请求之前,HTTP交换可能会被关闭。调用subscribe就是这样

最新更新