假设我有一个不断发送更新的流,如下所示:
locationFlow = StateFlow<Location?>(null)
我有一个用例,在一个特定事件发生后,我想从流中收集X值并继续,就像下面的一样。我知道collect
是一个终端运算符,所以我认为下面的逻辑不起作用,但在这种情况下我该怎么做呢?我想收集X个项目,保存它们,然后将它们发送到另一个函数进行处理。
fun onEventOccurred() {
launch {
val locations = mutableListOf<Location?>()
locationFlow.collect {
//collect only X locations
locations.add(it)
}
saveLocations(locations)
}
}
像这样的东西有预先存在的Kotlin函数吗?我想从流中收集X次,将项目保存到列表中,并将该列表传递给另一个函数。
collect
是终端并不重要。上游StateFlow将保持正常运行,因为StateFlow不关心它们的收集器在做什么。您可以使用take
函数来获取特定数量的项目,也可以使用toList()
(另一个终端函数(在它们准备好后简明地将它们复制到列表中。
fun onEventOccurred() {
launch {
saveLocations(locationFlow.take(5).toList())
}
}
如果我正确理解了您的用例,您希望:
- 丢弃元素,直到发送特定的元素——实际上,在重读你的问题后,我认为情况并非如此。。我把它留在示例中,仅供参考
- 当这种情况发生时,您希望收集X个项目以进行进一步处理
假设这是正确的,您可以使用dropWhile
和take
的组合,如下所示:
fun main() = runBlocking {
val messages = flow {
repeat(10) {
println(it)
delay(500)
emit(it)
}
}
messages
.dropWhile { it < 5 }
.take(3)
.collect { println(it) } // prints 5, 6, 7
}
你甚至可以有更复杂的逻辑,即丢弃任何小于5的数字,然后取前10个偶数:
fun main() = runBlocking {
val messages = flow {
repeat(100) {
delay(500)
emit(it)
}
}
messages
.dropWhile { it < 5 }
.filter { it % 2 == 0}
.take(10)
.collect { println(it) } // prints even numbers, 6 to 24
}