Spring boot: 2.5.6, Kotlin: 1.5.31, kotlinx-coroutines-reactive:1.5.2
我正在尝试下载pdf并立即将其上传到http://localhost:3000/upload
,如下所示:
Class Mediator(val dataBuffer: Flux<DataBuffer>, val url: String)
runBlocking {
getMediator()
.flatMap { uploadFile(it) }
.subscribe()
}
private fun getMediator(): Mono<Mediator> {
return WebClient.create(
"https://server.com/assets/file.pdf")
.get()
.exchangeToMono { response ->
Mono.just(
Mediator(response.bodyToFlux(DataBuffer::class.java), "http://localhost:3000/upload"))
}
}
private fun uploadFile(mediator: Mediator): Mono<ResponseEntity<Void>> {
return WebClient.create(mediator.url)
.put()
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(mediator.dataBuffer)
.retrieve()
.toBodilessEntity()
}
当我运行这段代码时,我可以看到我的服务器接收到一个调用并创建了一个空文件(大小为0)。不会引发任何错误。我无法解释为什么这段代码不能将字节传输到我的上传端点。在测试时,我意识到问题可能是在我的Mediator
对象中,getMediator
正在返回。uploadFile
中的body
不像被包裹的Flux<DataBuffer>
我找到了一个方法。只是回答我自己的问题,也许这对别人有帮助。
data class Mediator(val data: Flux<DataBuffer>, val url: String)
data class UploadUrl(val url: String)
runBlocking {
getFileDataBuffer(url)
.flatMap { prepareUpload(it.headers.contentLength, it.body) }
.flatMap { uploadFile(it.url, it.dataBuffer!!)}
}.subscribe()
private fun getFileDataBuffer(url:String): Mono<ResponseEntity<Flux<DataBuffer>>> {
return WebClient.create(url).get().retrieve().toEntityFlux()
}
private fun prepareUpload(length: Long, data: Flux<DataBuffer>?): Mono<Mediator> {
return WebClient.create("http://localhost:3000/prepare")
.post()
.contentType(MediaType.APPLICATION_JSON)
.bodyValue("""{"size":${length}}""")
.exchangeToMono { res ->
res.bodyToMono(String::class.java).map { parseJson<UploadUrl>(it) }.map {
Mediator(data, it.url)
}
}
}
private fun uploadFile(url: String, data: Flux<DataBuffer>): Mono<ResponseEntity<Void>> {
return WebClient.create(url)
.put()
.contentType(MediaType.APPLICATION_OCTET_STREAM)
.body(data)
.retrieve()
.toBodilessEntity()
}