我的任务是将一些基于Reactor的服务迁移到webflux.fn+协同程序。该服务生成一个png指南针图像。在从新的基于协同程序的服务返回给Postman之前,我看到了指南针的字节。在Postman中,我看到请求是成功的,但正文中没有内容;空的我一直不明白为什么Flow回归成功,但没有内容。我很感激你的建议。。。
感谢
>>>REQUEST
curl -X "POST" "http://localhost:8080/api/compass"
-H 'Content-Type: application/json; charset=utf-8'
-d $'{}'
>>>RESPONSE
HTTP/1.1 200 OK
transfer-encoding: chunked
Content-Type: image/png
Cache-Control: no-cache, no-store, max-age=0, must-revalidate
Pragma: no-cache
Expires: 0
X-Content-Type-Options: nosniff
X-Frame-Options: DENY
X-XSS-Protection: 1 ; mode=block
Referrer-Policy: no-referrer
connection: close
>>>WEBFLUX.FN ROUTER
@Component
class MyRouter(private val myHandler: MyHandler) {
@Bean
fun routes(myHandler: MyHandler) =
coRouter {
accept(APPLICATION_JSON).nest {
("/api".nest {
POST("/compass", myHandler::generateCompass)
})
}
}
}
>>>HANDLER
override suspend fun generateCompass(request: ServerRequest): ServerResponse {
return myService
.generateCompass(request.awaitBody())
.fold({ throw it }, {
ok()
.contentType(MediaType.IMAGE_PNG)
.bodyAndAwait(it)
})
}
>>>SERVICE
suspend fun generateCompass(request: CompassRequest
): Either<Throwable, Flow<ByteArray>> =
Either.Right(
flow<ByteArray> {
MapCompass(request)
.exportToRaster()
.map { it.toByteArray() }
})
最后,我发现使用flow{}和flow Of((之间有区别。我需要使用flowOf((来流式传输我的ByteArray。一切都很好,现在。。。
>>>CORRECTED CODE
private fun generateMapCompass(
request: MapCompassRequest
): Either<Throwable, Flow<ByteArray>> =
MapCompass()
.exportToRaster(MediaType.Png, ImageDimension(1.0f, 1.0f))
.map { baos ->
baos.toByteArray()
}
.map { flowOf(it) }
/**
* An analogue of the [flow] builder that does not check the context of execution of the resulting flow.
* Used in our own operators where we trust the context of invocations.
*/
@PublishedApi
internal inline fun <T> unsafeFlow(@BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit): Flow<T> {
return object : Flow<T> {
override suspend fun collect(collector: FlowCollector<T>) {
collector.block()
}
}
}
/**
* Creates a flow that produces values from the specified `vararg`-arguments.
*
* Example of usage:
* ```
* flowOf(1, 2, 3)
* ```
*/
public fun <T> flowOf(vararg elements: T): Flow<T> = flow {
for (element in elements) {
emit(element)
}
}