我正试图将CompletableFuture<Optional<T>>
转换为Flow<T?>
。我要写的扩展函数是
fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> =
this.toMono().map { (if (it.isPresent) it.get() else null) }.asFlow()
但是它失败了,因为asFlow()
不存在于可空类型,AFAICT基于它的定义。
那么,如何将CompletableFuture<Optional<T>>
转换为Flow<T?>
呢?
编辑1:
这是我到目前为止想到的。感谢反馈。
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import java.util.Optional
import java.util.concurrent.CompletableFuture
fun <T> Optional<T>.orNull(): T? = orElse(null)
fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> = flowOf(this.join().orNull())
仅供参考,在我的情况下,这是使用Axon的Kotlin扩展queryOptional
,我现在可以这样写:
inline fun <reified R, reified Q> findById(q: Q, qgw: QueryGateway): Flow<R?> {
return qgw.queryOptional<R, Q>(q).asFlowOfNullable()
}
我将推迟一段时间,用上述模式创建一个注释作为答案,以允许反馈。
编辑2:既然下面指出了编辑1中的asFlowOfNullable
会阻塞线程,我现在就从@Joffrey开始:
fun <T> Optional<T>.orNull(): T? = orElse(null)
fun <T> CompletableFuture<Optional<T>>.asDeferredOfNullable(): Deferred<T?> = thenApply { it.orNull() }.asDeferred()
编辑3:感谢@Tenfour04 &@Joffrey,感谢他们的帮助。:)
要使用下面的扩展,您需要jdk8
协程库:
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$1.5.0"
我不确定你正在使用的asFlow()
函数来自哪里,但我认为没有它就可以工作。对我来说,有一个单一项目的流似乎有点奇怪,因为它可能只是一个suspend
函数,或者如果你需要它作为一个对象来传递,一个Deferred,它旨在返回一个结果,因此更类似于一个未来而不是一个流。
fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> =
flow { emit(await().orElse(null)) }
作为挂起函数:
suspend fun <T> CompletableFuture<Optional<T>>.awaitNullable(): T? =
await().orElse(null))
作为延期:
fun <T> CompletableFuture<Optional<T>>.asDeferredNullable(): Deferred<T?> =
thenApply { it.orElse(null) }.asDeferred()