在 Kotlin 中,如何将"CompletableFuture<Optional<T>>"转换为"Flow<T?>"?



我正试图将CompletableFuture<Optional<T>>转换为Flow<T?>。我要写的扩展函数是

fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> =
this.toMono().map { (if (it.isPresent) it.get() else null) }.asFlow()

但是它失败了,因为asFlow()不存在于可空类型,AFAICT基于它的定义。

那么,如何将CompletableFuture<Optional<T>>转换为Flow<T?>呢?

编辑1:

这是我到目前为止想到的。感谢反馈。

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOf
import java.util.Optional
import java.util.concurrent.CompletableFuture
fun <T> Optional<T>.orNull(): T? = orElse(null)
fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> = flowOf(this.join().orNull())

仅供参考,在我的情况下,这是使用Axon的Kotlin扩展queryOptional,我现在可以这样写:

inline fun <reified R, reified Q> findById(q: Q, qgw: QueryGateway): Flow<R?> {
return qgw.queryOptional<R, Q>(q).asFlowOfNullable()
}

我将推迟一段时间,用上述模式创建一个注释作为答案,以允许反馈。

编辑2:既然下面指出了编辑1中的asFlowOfNullable会阻塞线程,我现在就从@Joffrey开始:

fun <T> Optional<T>.orNull(): T? = orElse(null)
fun <T> CompletableFuture<Optional<T>>.asDeferredOfNullable(): Deferred<T?> = thenApply { it.orNull() }.asDeferred()

编辑3:感谢@Tenfour04 &@Joffrey,感谢他们的帮助。:)

要使用下面的扩展,您需要jdk8协程库:

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-jdk8:$1.5.0"

我不确定你正在使用的asFlow()函数来自哪里,但我认为没有它就可以工作。对我来说,有一个单一项目的流似乎有点奇怪,因为它可能只是一个suspend函数,或者如果你需要它作为一个对象来传递,一个Deferred,它旨在返回一个结果,因此更类似于一个未来而不是一个流。

fun <T> CompletableFuture<Optional<T>>.asFlowOfNullable(): Flow<T?> =
flow { emit(await().orElse(null)) }

作为挂起函数:

suspend fun <T> CompletableFuture<Optional<T>>.awaitNullable(): T? = 
await().orElse(null))

作为延期:

fun <T> CompletableFuture<Optional<T>>.asDeferredNullable(): Deferred<T?> =
thenApply { it.orElse(null) }.asDeferred()

最新更新