如何跳过Flow并只发送最后发出的值



我正在开发一个web应用程序,以可视化基于Websockets的迷宫生成和求解算法。生成算法实现接口IMazeGenerator并返回Flow。

interface IMazeGenerator {
fun generate(maze: Maze): Flow<Maze>
}

在生成器算法中的每个步骤之后,都会发出更新后的迷宫。收集器然后在短暂延迟后将中间步骤发送到客户端。

generatorFlow
.map { it.toDto() }
.delay(150)
.onEach {
client.send(UpdateMaze(it))
}
.launchIn(this.scope)

现在我的问题是:我想为客户提供跳过中间步骤,然后只发送最后一个结果,即最终迷宫的可能性。要做到这一点,我首先需要能够确定发射器是否已经完成(即,最终的迷宫已经发射(,如果是,则在收到跳过命令时只发送最后一个结果。

不幸的是,我现在根本不知道,我甚至不确定这是否适用于流。欢迎任何帮助。

感谢marstran,不幸的是,这还没有直接奏效,但你给了我必要的灵感。让我先解释一下这个问题,然后再解释我的解决方案。

一旦触发生成,流就会立即启动。如果客户端希望在某个点之后跳过所有进一步的中间步骤,我需要从这个流中传输最后一个发射。对于您的解决方案,问题是在这种情况下,我会重新启动流,并得到不同的结果,因为有些算法是不确定的。另一个问题是,我首先必须确保发射器是否已经计算出最后的结果,因为有些算法确实需要一些时间。在这一点上,流动是可以跳过的。因此,我还不得不使用一个缓冲区来省略背压。

我现在创建了一个扩展函数,它接收上一个发出值的回调函数。

fun <T> Flow<T>.finalValueCallback(block: suspend (T?) -> Unit) = flow {
var finalValue: T? = null
collect {
emit(it)
finalValue = it
}
block(finalValue)
}

然后我使用这个回调来保存当前流的最后一个结果。如果出现跳过命令,我会中断流程,只发送最后一个结果。

generatorJob = generatorFlow
.onStart { client.send(UpdateGeneratorState(GeneratorState.RUNNING)) }
.map { it.toDto() }
.finalValueCallback {
finalMaze = it
client.send(UpdateGeneratorState(GeneratorState.SKIPPABLE))
}
.buffer()
.delay(150)
.onEach {
client.send(UpdateMaze(it))
}
.onCompletion { cause ->
finalValue = null
if (cause == null || cause is FlowSkippedException) {
client.send(UpdateGeneratorState(GeneratorState.COMPLETED))
} else {
client.send(UpdateGeneratorState(GeneratorState.CANCELLED))
}
}
.launchIn(this.scope)

您可以创建一个扩展函数来收集流并返回最后一个元素。请注意,如果输入流是无限的,它将无限期地挂起。

类似这样的东西:

suspend fun <T> Flow<T>.lastOrNull(): T? {
var elem: T? = null
collect { elem = it }
return elem
}

现在客户端只需执行val finalMaze = mazeSteps.lastOrNull()即可获得最终迷宫。

如果你想将输入流转换为只发出最终迷宫的流,你可以这样做:

fun <T> Flow<T>.skipUntilLast(): Flow<T> = flow { 
val last = lastOrNull()
if (last != null) emit(last)
}

最新更新