我需要在各种通道处理器之间插入一个包含10000个元素的缓冲区。
produce()
提供了一种配置缓冲区大小的方法:
produce(capacity = 10_000) {
}
然而,map
、filter
默认为会合通道:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context) { // No capacity specified, defaults to 0
consumeEach {
send(transform(it))
}
}
有没有办法对此进行配置?目前,我正在用缓冲区构建这些stdlib函数的自己版本,这不是很优雅。
唯一的方法是使用capacity
参数提供您自己的映射实现:
fun <E, R> ReceiveChannel<E>.map(context: CoroutineContext = Unconfined, capacity: Int = 0, transform: suspend (E) -> R): ReceiveChannel<R> =
produce(context, capacity = capacity) {
consumeEach {
send(transform(it))
}
}
我创建了https://github.com/Kotlin/kotlinx.coroutines/issues/841,但不会很快实施。当引入冷流以使热数据源和冷数据源之间的取消和范围层次结构一致时,所有通道运算符都可能被重新设计。