Kotlin Flow脱机缓存



我是kotlin流的新手,我正在处理这个文档。Kotlin流量。在这段代码中,数据源每隔五秒钟从api中获取数据并发出它

这是我的示例数据源类

我正在获取数据并将其发送出去。

class RemoteDataSourceImpl @Inject constructor(
private val api:CryptoApi
): RemoteDataSource {

override suspend fun cryptoList(): Flow<List<CryptoCoinDto>> {
return flow {
while (true){

val data = api.getCoinList()
emit(data)
delay(5000L)
}
}
}
}

这是我的示例存储库

我正在映射数据并将其保存到房间数据库中。由于真理的单一来源原则,我想从房间数据库中获取数据并发送它,但我仍然必须返回dataSource,因为如果我打开新的流{},我就无法访问数据源的数据。当然,我可以通过使用List而不是Flow<列表>RemoteDataSource类内部。但我想了解这个例子。我如何在这里应用单一的真理来源。

class CoinRepositoryImpl @Inject constructor(
private val dataSource:RemoteDataSource,
private val dao: CryptoDao
):CoinRepository {
override fun getDataList(): Flow<List<CryptoCoin>> {
dataSource.cryptoList().map { dtoList ->
val entityList = dtoList.map { dto ->
dto.toCryptoEntity()
}
dao.insertAll(entityList)
}
return dataSource.cryptoList().map {
it.map { it.toCryptoCoin() }
}
}

也许这会有所帮助。我个人不会为API的东西返回一个流,因为从技术上讲,你总是只返回一个列表

override fun getDataList(): Flow<List<CryptoCoin>> {
val localFlow = dao.getCryptoCoins()
val apiFlow = flow<Unit> {
// Since getDataList can't be suspend due to it returning a flow, we create a new flow simply to run a suspendable function
getAndSaveCryptoCoinsFromApi()
}.onStart { // If I don't add this onStart, then it won't work
emit(Unit)
}

return localFlow.combine(apiFlow) { dbItems, _ ->
dbItems
// We just return DB items. Why? Since we are saving api items to the db, and since the db items return a flow, it will automatically emit values to the flow once new items are added to the db
}
}
private suspend fun getAndSaveCryptoCoinsFromApi() {
try {
val result = api.getCryptoCoins().toDomain()
val entities = result.map { it.toEntity() }
dao.insertCoins(entities)
} catch(e: Exception) { /**/ }
}

这实际上比看起来更复杂。流量的设计是为了支持背压,这意味着它们通常只在消费时按需生产物品。它们是被动的,不是推送项目,而是从流中拉出项目。

(免责声明:这对冷流来说都是正确的,而对热流来说则不然。但cryptoList()是冷流。(

当消费者比生产商慢或者根本没有人消费商品时,它的设计可以极大地简化这种情况。然后制片人停止了制作,一切都很好。

在您的案例中,有两个消费者,因此这再次变得更加复杂。如果一个消费者比另一个慢,你需要决定该怎么办。例如,如果没有人从getDataList()收集数据,该怎么办?有多种选择,每种选择都需要一种不同的方法:

  1. 停止使用源流,因此停止更新数据库
  2. 如果没有人从getDataList()收集,则始终更新数据库并对项目进行排队。如果队列中的项目越来越多怎么办
  3. 一直更新数据库,如果没有人从getDataList()收集,则丢弃项目

Ad.1.

可以使用onEach((:

return dataSource.cryptoList().onEach {
// update db
}.map {
it.map { it.toCryptoCoin() }
}

在该解决方案中,更新数据库是一种;副作用";消耗CCD_ 5流。

Ad.2.和Ad.3.

在这种情况下,我们不能被动地等到有人向我们索要物品。我们需要积极消费来源流中的物品,并将其推向下游流。因此,我们需要一个热流:SharedFlow。此外,由于我们在这种情况下仍然是活跃的一方,我们必须启动一个协同程序,在后台完成这项工作。所以我们需要一个CoroutineScope

解决方案取决于您的具体需求:您是否需要队列,如果队列超过大小限制会发生什么,等等,但它将类似于:

return dataSource.cryptoList().onEach {
// update db
}.map {
it.map { it.toCryptoCoin() }
}.shareIn(scope, SharingStarted.Eagerly)

您还可以阅读有关buffer((和MutableSharedFlow的内容,它们可能对您有用。

最新更新