Coroutines Channel Value EmafeEach等待而不是当时消耗一次



我在弄清楚如何使用频道时遇到问题,我希望它们在发送后立即将其推向消费者,而是在加载了两个源的数据后,我将获得值。

mainActivity.kt

fun loadData() {
    textView.text = "LOADING"
    launch {
        repository.loadData().consumeEach { loaded ->
            withContext(Dispatchers.Main) {
                logd("Presenting: ${loaded.size}, $loaded")
                textView.text = loaded.joinToString { "$itn" }
            }
        }
    }

repository.kt

suspend fun loadData(): ReceiveChannel<List<String>> {
    return coroutineScope {
        produce(capacity = 2) {
            launch {
                val localData = local.loadData()
                send(localData)
            }
            launch {
                val remoteData = remote.loadData()
                send(remoteData)
            }
        }
    }
}

remote.kt

override val data: MutableList<String> = mutableListOf("R1", "R2", "R3", "R4", "R5")
override suspend fun loadData(): List<String> {
    logd("Loading remote started")
    val wait = Random.nextLong(0, 500)
    delay(wait)
    logd("Remote loading took $wait")
    logd("Loading remote finished: ${data.size}, $data")
    return data
}

local.kt

override val data: MutableList<String> = mutableListOf("L1", "L2", "L3", "L4", "L5")
override suspend fun loadData(): List<String> {
    logd("Loading local started")
    val wait = Random.nextLong(1000, 2000)
    delay(wait)
    logd("Local loading took $wait")
    logd("Loading local finished: ${data.size}, $data")
    return data
}

我在控制台内部

D/Local: Loading local started
D/Remote: Loading remote started
D/Remote: Remote loading took 265
D/Remote: Loading remote finished: 5, [R1, R2, R3, R4, R5]
D/Local: Local loading took 1650
D/Local: Loading local finished: 5, [L1, L2, L3, L4, L5]
D/DispatchedCoroutine: Presenting: 5, [R1, R2, R3, R4, R5]
D/DispatchedCoroutine: Presenting: 5, [L1, L2, L3, L4, L5]

这看起来像是在达到容量后发出的两个来源的数据。我期望的是,消费者在发送数据后可以立即接收。因此,控制台输出看起来更像是这样。

D/Local: Loading local started
D/Remote: Loading remote started
D/Remote: Remote loading took 265
D/DispatchedCoroutine: Presenting: 5, [R1, R2, R3, R4, R5]
D/Remote: Loading remote finished: 5, [R1, R2, R3, R4, R5]
D/Local: Local loading took 1650
D/Local: Loading local finished: 5, [L1, L2, L3, L4, L5]
D/DispatchedCoroutine: Presenting: 5, [L1, L2, L3, L4, L5]

使用coroutine.channel?


编辑#1:

Repository#loadData()删除coroutineScope{...}后,它开始按预期工作。但是现在我有一个问题,即我必须将范围作为一个函数参数,这对我来说超级丑陋。

repository.kt

suspend fun loadData(scope: CoroutineScope): ReceiveChannel<List<String>> {
    return scope.produce(capacity = 2) {
        launch {
            val localData = local.loadData()
            send(localData)
        }
        launch {
            val remoteData = remote.loadData()
            send(remoteData)
        }
        invokeOnClose {
            logd("Closing channel")
        }
    }
}

我认为您的代码可以做您期望做的事情。我认为您遇到的问题是,登录在发生时没有到达您的控制台。请记住,记录本身具有自己的缓冲和IO线程。我尝试了您的代码并使用了println,因此我得到了您的预期行为。要确认,您可以不必随机等待,而是将等待时间增加到每个秒的10秒钟,然后真正使它们实现1个接一个。只是为了帮助您自己确认这一点,这是我尝试做的非Android版本:

    fun main() = runBlocking {
    val start = System.currentTimeMillis()
    launch(Dispatchers.Unconfined) {
        loadData().consumeEach { loaded ->
            println("Presenting: ${loaded.size}, $loaded")
        }
    }.join()
    println("The whole thing took ${System.currentTimeMillis() - start}")
}
suspend fun CoroutineScope.loadData() = produce {
    launch {
        val localData = localloadData()
        send(localData)
    }
    launch {
        val remoteData = remoteloadData()
        send(remoteData)
    }
}
val remoteData: MutableList<String> = mutableListOf("R1", "R2", "R3", "R4", "R5")
suspend fun remoteloadData(): List<String> {
    println("Loading remote started")
    val wait = 500L
    delay(wait)
    println("Remote loading took $wait")
    println("Loading remote finished: ${remoteData.size}, $remoteData")
    return remoteData
}
val localData: MutableList<String> = mutableListOf("L1", "L2", "L3", "L4", "L5")
suspend fun localloadData(): List<String> {
    println("Loading local started")
    val wait = 1000L
    delay(wait)
    println("Local loading took $wait")
    println("Loading local finished: ${localData.size}, $localData")
    return localData
}

它产生了以下内容:

Loading local started
Loading remote started
Remote loading took 500
Loading remote finished: 5, [R1, R2, R3, R4, R5]
Presenting: 5, [R1, R2, R3, R4, R5]
Local loading took 1000
Loading local finished: 5, [L1, L2, L3, L4, L5]
Presenting: 5, [L1, L2, L3, L4, L5]
The whole thing took 1046

编辑:我删除了您更新价值的withContext(Dispatchers.Main) - 确实不需要。您已经在此阶段完成了异步工作。相反,您需要在顶级launch中指定上下文,因为现在确实如此。

除非您另有说明,否则其余工作应继承该上下文。无需继续传递上下文作为参数。

如果您确实发现自己处于其他上下文取代继承的上下文的位置,那么可能会将其作为参数可能是一种方法,但是我的偏爱是找到工作,并以某种方式表达它确实从调用上下文继承。

相关内容

  • 没有找到相关文章

最新更新