Kotlin:如果没有人在阅读,则在 redezvous 频道上发送不会暂停



我不明白为什么这个程序没有结束:

object RedezvousExample {
@JvmStatic
fun main(args: Array<String>) {
setOptionToShowCoroutineNames()
runBlocking {
val ioChannel = Channel<String>()
val outputFunction: StringDestination = ChannelDestination(ioChannel, this)
val producerJob = launch(Dispatchers.Default) { producer(outputFunction) }
val producedValue = ioChannel.receive()
logMsg("Received $producedValue")
producerJob.cancel()
}
}
}
fun producer(output: StringDestination) {
for(i in 1..Int.MAX_VALUE) {
logMsg("Producing $i")
output("Iteration $i")
}
}
class ChannelDestination(val output: SendChannel<String>, val coroutineScope: CoroutineScope) : StringDestination {
override fun invoke(line: String) {
coroutineScope.async(Dispatchers.Default) {
logMsg("Sending $line")
output.send(line)
}
}
}
fun logMsg(msg: Any?) = println("${threadName()}$msg")

在我对会合频道的(糟糕的(理解中,我认为如果没有人读取频道,对 output.send 的调用将被阻塞,而制作者似乎会连续写入频道,即使没有挂起的接收方法。

谢谢

每次发送新值时都会创建一个新的协程。

operator fun invoke(line: String) {
coroutineScope.async(Dispatchers.Default) {
logMsg("Sending $line")
output.send(line)
}
}

这不会暂停,因为异步只是构建启动并忘记(但会延迟等待(。

显然output.send(line)行挂起,并释放将延续(异步块内的协程(添加到暂停状态的线程,并且没有发送值。但是logMsg()仍然通过 for 循环调用,因为 for 循环从未挂起。

为了克服这个问题,你的invoke()函数需要以某种方式挂起,直到 send(( 调用恢复,以便 for 循环挂起。您不能启动新的协程,因为它们无论如何都不会挂起。

// make producer suspend
suspend fun producer(output: ChannelDestination) {
for (i in 1..Int.MAX_VALUE) {
logMsg("Producing $i")
output("Iteration $i")
}
}
// make it suspend as well
suspend operator fun invoke(line: String) {
logMsg("Sending $line")
output.send(line)
}

您已经在Dispatchers.Default中启动了producer()函数,因此所有内容都将在其上运行并挂起,直到 sb 收到它。

@Animesh的回答非常好。我是这样阐述的:

object RedezvousExample01 {
@JvmStatic
fun main(args: Array<String>) {
setOptionToShowCoroutineNames()
runBlocking {
val ioChannel = Channel<String>()
val outputFunction: StringDestination = ChannelDestination(ioChannel)
launch { producer(outputFunction) }
val producedValue = ioChannel.receive()
ioChannel.close()
logMsg("Received $producedValue")
}
}
}
fun producer(output: StringDestination) {
try {
for (i in 1..Int.MAX_VALUE) {
logMsg("Producing $i")
output("Iteration $i")
}
} catch (e: ClosedSendChannelException) {}
}
class ChannelDestination(val output: SendChannel<String>) : StringDestination {
override fun invoke(line: String) {
runBlocking {
logMsg("Sending $line")
output.send(line)
}
}
}

无论如何,我不得不关闭通道才能结束协程,我猜为什么 job.cancel(( 方法不起作用。

最新更新