我是kotlin流的新手,我正在处理这个文档。Kotlin流量。在这段代码中,数据源每隔五秒钟从api中获取数据并发出它
这是我的示例数据源类
我正在获取数据并将其发送出去。
class RemoteDataSourceImpl @Inject constructor(
private val api:CryptoApi
): RemoteDataSource {
override suspend fun cryptoList(): Flow<List<CryptoCoinDto>> {
return flow {
while (true){
val data = api.getCoinList()
emit(data)
delay(5000L)
}
}
}
}
这是我的示例存储库
我正在映射数据并将其保存到房间数据库中。由于真理的单一来源原则,我想从房间数据库中获取数据并发送它,但我仍然必须返回dataSource,因为如果我打开新的流{},我就无法访问数据源的数据。当然,我可以通过使用List而不是Flow<列表>RemoteDataSource类内部。但我想了解这个例子。我如何在这里应用单一的真理来源。
class CoinRepositoryImpl @Inject constructor(
private val dataSource:RemoteDataSource,
private val dao: CryptoDao
):CoinRepository {
override fun getDataList(): Flow<List<CryptoCoin>> {
dataSource.cryptoList().map { dtoList ->
val entityList = dtoList.map { dto ->
dto.toCryptoEntity()
}
dao.insertAll(entityList)
}
return dataSource.cryptoList().map {
it.map { it.toCryptoCoin() }
}
}
也许这会有所帮助。我个人不会为API的东西返回一个流,因为从技术上讲,你总是只返回一个列表
override fun getDataList(): Flow<List<CryptoCoin>> {
val localFlow = dao.getCryptoCoins()
val apiFlow = flow<Unit> {
// Since getDataList can't be suspend due to it returning a flow, we create a new flow simply to run a suspendable function
getAndSaveCryptoCoinsFromApi()
}.onStart { // If I don't add this onStart, then it won't work
emit(Unit)
}
return localFlow.combine(apiFlow) { dbItems, _ ->
dbItems
// We just return DB items. Why? Since we are saving api items to the db, and since the db items return a flow, it will automatically emit values to the flow once new items are added to the db
}
}
private suspend fun getAndSaveCryptoCoinsFromApi() {
try {
val result = api.getCryptoCoins().toDomain()
val entities = result.map { it.toEntity() }
dao.insertCoins(entities)
} catch(e: Exception) { /**/ }
}
这实际上比看起来更复杂。流量的设计是为了支持背压,这意味着它们通常只在消费时按需生产物品。它们是被动的,不是推送项目,而是从流中拉出项目。
(免责声明:这对冷流来说都是正确的,而对热流来说则不然。但cryptoList()
是冷流。(
当消费者比生产商慢或者根本没有人消费商品时,它的设计可以极大地简化这种情况。然后制片人停止了制作,一切都很好。
在您的案例中,有两个消费者,因此这再次变得更加复杂。如果一个消费者比另一个慢,你需要决定该怎么办。例如,如果没有人从getDataList()
收集数据,该怎么办?有多种选择,每种选择都需要一种不同的方法:
- 停止使用源流,因此停止更新数据库
- 如果没有人从
getDataList()
收集,则始终更新数据库并对项目进行排队。如果队列中的项目越来越多怎么办 - 一直更新数据库,如果没有人从
getDataList()
收集,则丢弃项目
Ad.1.
可以使用onEach((:
return dataSource.cryptoList().onEach {
// update db
}.map {
it.map { it.toCryptoCoin() }
}
在该解决方案中,更新数据库是一种;副作用";消耗CCD_ 5流。
Ad.2.和Ad.3.
在这种情况下,我们不能被动地等到有人向我们索要物品。我们需要积极消费来源流中的物品,并将其推向下游流。因此,我们需要一个热流:SharedFlow。此外,由于我们在这种情况下仍然是活跃的一方,我们必须启动一个协同程序,在后台完成这项工作。所以我们需要一个CoroutineScope
。
解决方案取决于您的具体需求:您是否需要队列,如果队列超过大小限制会发生什么,等等,但它将类似于:
return dataSource.cryptoList().onEach {
// update db
}.map {
it.map { it.toCryptoCoin() }
}.shareIn(scope, SharingStarted.Eagerly)
您还可以阅读有关buffer((和MutableSharedFlow的内容,它们可能对您有用。