如何从非流函数中获得流输出



我想要非流函数(返回类型T(的流输出(返回类型为Flow<T>(。

fun getTotalFiles(): Int 
// Say, This is a library function it'll return the number of files (Int) in that folder at that specific moment.
//And,
fun getAllFiles(): List<File> 
// Say, This is a library function it'll return all the files (List<File>) in that folder.

该文件夹中的文件将来可能也会更改

现在,我想不断地观察输出,那么我该如何实现它呢?

fun getFlowOfTotalFiles(): Flow<Int> =
// A wrapper function that converts the library function return type to an observable flow, Flow<Int>
//And,
fun getFlowOfAllFiles(): Flow<List<File>> =
// A wrapper function that converts the library function return type to an observable flow, Flow<List<File>>

为了专门监视目录中的文件,可以使用WatchService并使用flow生成器将其转换为流。类似这样的东西:

fun getDirectoryMonitorFlow(directory: String) = flow {
FileSystems.getDefault().newWatchService().use { watchService ->
while (true) {
val watchKey = Path.of(directory).register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
if (watchKey.pollEvents().isNotEmpty()) {
emit(Unit)
}
yield() // give flow opportunity to be cancelled.
if (!watchKey.reset()) {
println("Directory became unreadable. Finishing flow.")
break
}
}
}
}
.catch { println("Exception while monitoring directory.") }
.flowOn(Dispatchers.IO)

然后你的课可能看起来像:

fun getFlowOfTotalFiles(): Flow<Int> = getFlowOfAllFiles()
.map { it.size }
.distinctUntilChanged()
fun getFlowOfAllFiles(): Flow<List<File>> = flow {
emit(Unit) // so current state is always emitted
emitAll(getDirectoryMonitorFlow(directory))
}
.map {
File(directory).listFiles()?.toList().orEmpty()
}
.flowOn(Dispatchers.IO)
.distinctUntilChanged()

尽管您可能会考虑将第一个流设为专用SharedFlow,这样您就不会同时运行多个WatchServices来监视同一目录。

我认为您需要在流生成器中有一个无限循环,如下所示:

fun getFlowOfTotalFiles(): Flow<Int> = flow {
while (true) {
emit(getTotalFiles())
// delays for 5 sec before next request and 
// terminates the infinite cycle when a coroutine, 
// that collects this Flow, is canceled
delay(5000) 
}
}
fun getAllFilesFlow(): Flow<List<File>> = flow {
while (true) {
emit(getAllFiles())
delay(5000)
}
}

最新更新