我正在尝试在 Kotlin 中为 Iterable 和 Sequence 实现并行实现。我得到了一个小文件,它由 4 个扩展函数组成,但第三个给了我一个编译器错误:
suspend fun <T, R> Iterable<T>.parallelMap(block: suspend(T) -> R) =
coroutineScope { map { async { block(it) } }.map { it.await() } }
suspend fun <T> Iterable<T>.parallelForEach(block: suspend (T) -> Unit) =
coroutineScope { map { async { block(it) } }.forEach { it.await() } }
suspend fun <T, R> Sequence<T>.parallelMap(block: suspend(T) -> R) =
coroutineScope { map { async { block(it) } }.map { it.await() } }
suspend fun <T> Sequence<T>.parallelForEach(block: suspend (T) -> Unit) =
coroutineScope { map { async { block(it) } }.forEach { it.await() } }
编译器回来后说,暂停函数只能在暂停函数内部调用。有没有办法实现这一点?
编辑:修复了错误的复制/粘贴
编辑2:我想到了一个实现:
suspend fun <T, R> Sequence<T>.parrallelMap(block: suspend (T) -> R) =
asIterable().map { coroutineScope { async { block(it) } } }
.asSequence().map { runBlocking { it.await() } }
我希望这会触发所有挂起功能并懒洋洋地等待它们。我只是不确定这是否安全,或者这是否节省时间。
延迟序列并行执行的核心语义存在问题。在迭代生成的序列之前,当前实现不会block(it)
启动:
suspend fun <T, R> Sequence<T>.parallelMap(block: suspend(T) -> R) =
coroutineScope { map { async { block(it) } }.map { it.await() } }
请考虑以下示例:
sequenceOf(1, 2, 3).parallelMap { it * it }.forEach { println(it) }
对于此示例,执行顺序为
val p1 = async { 1 * 1 }
val r1 = p1.await()
println(r1)
val p2 = async { 2 * 2 }
val r2 = p2.await()
println(r2)
val p3 = async { 3 * 3 }
val r3 = p3.await()
println(r3)
请注意,映射操作的执行是均衡的,而不是并行的。
编译器告诉您的是,Sequence<T>.map {}
的 lambda 是在调用上下文之外(阅读:在协程之外(按需延迟执行的,因此您无法使用当前所在的协程。
坦率地说,我不确定如何既执行惰性计算又并行执行。