我有一个反应式核心WebClient要发布到给定的端点。有效载荷是JsonNodes的流量,内容类型为application/stream+json
JsonNode response = localEP.post().uri( "/createItem" )
.contentType(MediaType.APPLICATION_STREAM_JSON)
.body( BodyInserters.fromPublisher(itemData, JsonNode.class ))
.retrieve()
.bodyToMono( JsonNode.class )
.block();
在服务器端,我尝试了Spring Controller风格和Spring Web Reactive FunctionHandler,以使用Flux的有效负载来处理上述调用的有效负载。
@PostMapping(path = "/dev/jobad/dynamo", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE)
@ResponseStatus(HttpStatus.CREATED)
public Flux<JsonNode> loadItems (@RequestBody Flux<JsonNode> items) {
items.subscribe(storage::add);
JsonNode response = new ObjectMapper().createObjectNode().put( "shady", "shade" );
return Flux.just( response );
}
返回到客户端总是可以的,但是服务器报告流量的内容是空的。如果我将(@RequestBody Flux<JsonNode> items
更改为(@RequestBody JsonNode items
,则有效载荷接收良好。WebClient日志似乎表明它已将数据写入连线并处理了响应。然而,机身似乎清空了
Reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.web.server.ServerWebInputException: Response status 400 with reason "Request body is missing: public reactor.core.publisher.Flux<com.fasterxml.jackson.databind.JsonNode> com.talroo.rest.JobResource.loadJobs(reactor.core.publisher.Flux<com.fasterxml.jackson.databind.JsonNode>)"
Caused by: org.springframework.web.server.ServerWebInputException:
Response status 400 with reason "Request body is missing: public
reactor.core.publisher.Flux<com.fasterxml.jackson.databind.JsonNode>
我需要做什么才能以Flux的身份处理帖子的请求主体?
首先,我不认为Spring正式支持直接从控制器读取/写入JacksonJsonNode
实例。您的应用程序应该请求一个域对象或类似Map<String, String>
的东西。
现在,在Jackson的模型中,JsonNode
表示JSON树中的任何节点——因为它是一棵树,所以你可以期望得到节点的Flux
,但你显然能够得到根节点——这解释了你所看到的行为。
因此,我认为您的应用程序应该依赖于更高级别的类,并让Jackson为您反序列化它们。
请注意,您的控制器实现也违反了一些规则:
- 您不应该在返回反应类型的方法中调用阻塞运算符,如
block
(您的控制器不会破坏此类型,而是关闭( - 您不应该破坏反应管道,并将请求的读取和响应的写入解耦;在您的控制器有机会读取整个请求之前,HTTP交换可能会被关闭。调用
subscribe
就是这样