Kotlin-如何从流中收集X值



假设我有一个不断发送更新的流,如下所示:

locationFlow = StateFlow<Location?>(null)

我有一个用例,在一个特定事件发生后,我想从流中收集X值并继续,就像下面的一样。我知道collect是一个终端运算符,所以我认为下面的逻辑不起作用,但在这种情况下我该怎么做呢?我想收集X个项目,保存它们,然后将它们发送到另一个函数进行处理。

fun onEventOccurred() {
launch {
val locations = mutableListOf<Location?>()
locationFlow.collect {
//collect only X locations
locations.add(it)
}
saveLocations(locations)
}
}

像这样的东西有预先存在的Kotlin函数吗?我想从流中收集X次,将项目保存到列表中,并将该列表传递给另一个函数。

collect是终端并不重要。上游StateFlow将保持正常运行,因为StateFlow不关心它们的收集器在做什么。您可以使用take函数来获取特定数量的项目,也可以使用toList()(另一个终端函数(在它们准备好后简明地将它们复制到列表中。

fun onEventOccurred() {
launch {
saveLocations(locationFlow.take(5).toList())
}
}

如果我正确理解了您的用例,您希望:

  • 丢弃元素,直到发送特定的元素——实际上,在重读你的问题后,我认为情况并非如此。。我把它留在示例中,仅供参考
  • 当这种情况发生时,您希望收集X个项目以进行进一步处理

假设这是正确的,您可以使用dropWhiletake的组合,如下所示:

fun main() = runBlocking {
val messages = flow {
repeat(10) {
println(it)
delay(500)
emit(it)
}
}
messages
.dropWhile { it < 5 }
.take(3)
.collect { println(it) } // prints 5, 6, 7
}

你甚至可以有更复杂的逻辑,即丢弃任何小于5的数字,然后取前10个偶数:

fun main() = runBlocking {
val messages = flow {
repeat(100) {
delay(500)
emit(it)
}
}
messages
.dropWhile { it < 5 }
.filter { it % 2 == 0}
.take(10)
.collect { println(it) } // prints even numbers, 6 to 24
}

最新更新