如何在 R 中使用 'foreach' 和 '%dopar%' 和 'R6' 类?



我在尝试将%dopar%foreach()R6类一起使用时遇到了一个问题。 搜索四周,我只能找到与此相关的两个资源,一个未回答的 SO 问题和一个R6存储库上的未解决的 GitHub 问题。

在一个注释(即 GitHub 问题(中,建议通过将类的parent_env重新分配为 SomeClass$parent_env <- environment() 来解决方法。我想了解当这个表达式(即SomeClass$parent_env <- environment()(在foreach %dopar%内调用时,environment()到底指的是什么?

下面是一个最小的可重现示例:

Work <- R6::R6Class("Work",
    public = list(
        values = NULL,

        initialize = function() {
            self$values <- "some values"
        }
    )
)

现在,以下Task类在构造函数中使用 Work 类。

Task <- R6::R6Class("Task",
    private = list(
        ..work = NULL
    ),

    public = list(
        initialize = function(time) {
            private$..work <- Work$new()
            Sys.sleep(time)
        }
    ),

    active = list(
        work = function() {
            return(private$..work)
        }
    )
)

Factory 类中,创建 Task 类,并在 ..m.thread() 中实现foreach

Factory<- R6::R6Class("Factory",
    private = list(
        ..warehouse = list(),
        ..amount = NULL,
        ..parallel = NULL,

        ..m.thread = function(object, ...) {
            cluster <- parallel::makeCluster(parallel::detectCores() -  1)
            doParallel::registerDoParallel(cluster)
            private$..warehouse <- foreach::foreach(1:private$..amount, .export = c("Work")) %dopar% {
                # What exactly does `environment()` encapsulate in this context?
                object$parent_env <- environment()
                object$new(...) 
            }
            parallel::stopCluster(cluster)
        },

        ..s.thread = function(object, ...) {
            for (i in 1:private$..amount) {
               private$..warehouse[[i]] <- object$new(...)
            }
        },

        ..run = function(object, ...) {
            if(private$..parallel) {
                private$..m.thread(object, ...)
            } else {
                private$..s.thread(object, ...)
            }
        }
    ),

    public = list(
        initialize = function(object, ..., amount = 10, parallel = FALSE) {
            private$..amount = amount
            private$..parallel = parallel
            private$..run(object, ...)
        }
    ),

    active = list(
        warehouse = function() {
            return(private$..warehouse)
        }
    )
)

然后,它被称为:

library(foreach)
x = Factory$new(Task, time = 2, amount = 10, parallel = TRUE)

如果没有以下object$parent_env <- environment()行,它会抛出错误(即,如其他两个链接中所述(:Error in { : task 1 failed - "object 'Work' not found"

我想知道,(1(在foreach内部分配parent_env时有哪些潜在的陷阱,(2(为什么它首先有效?


更新 1:

  • 我从foreach()内部返回了environment(),以便private$..warehouse捕获这些环境
  • 在调试会话中使用rlang::env_print()(即,browser()语句是在foreach结束执行后立即放置的(,以下是它们的组成:
Browse[1]> env_print(private$..warehouse[[1]])
# <environment: 000000001A8332F0>
# parent: <environment: global>
# bindings:
#  * Work: <S3: R6ClassGenerator>
#  * ...: <...>
Browse[1]> env_print(environment())
# <environment: 000000001AC0F890>
# parent: <environment: 000000001AC20AF0>
# bindings:
#  * private: <env>
#  * cluster: <S3: SOCKcluster>
#  * ...: <...>
Browse[1]> env_print(parent.env(environment()))
# <environment: 000000001AC20AF0>
# parent: <environment: global>
# bindings:
#  * private: <env>
#  * self: <S3: Factory>
Browse[1]> env_print(parent.env(parent.env(environment())))
# <environment: global>
# parent: <environment: package:rlang>
# bindings:
#  * Work: <S3: R6ClassGenerator>
#  * .Random.seed: <int>
#  * Factory: <S3: R6ClassGenerator>
#  * Task: <S3: R6ClassGenerator>

免责声明:我在这里说的很多都是基于我所知道的有根据的猜测和推论, 我不能保证一切都是100%正确的。

我认为可能有很多陷阱, 哪一个适用实际上取决于你做什么。 我认为你的第二个问题更重要, 因为如果你明白这一点, 您将能够自己评估一些陷阱。

这个话题相当复杂, 但您可能可以从阅读 R 的词汇范围开始。 从本质上讲,R 具有一种环境层次结构, 当执行 R 代码时, 在当前环境中找不到其值的变量 (这是environment()返回的( 在环境中查找 (不要与调用方环境混淆(。

根据您链接的 GitHub 问题, R6生成器保存对其父环境的"引用", 他们希望他们的类可能需要的所有内容都可以在所述父级或环境层次结构中的某个地方找到, 从那个父母开始,然后"向上"。

您使用的解决方法起作用的原因是,您要将生成器的父环境替换为并行工作线程内部当前foreach调用中的父环境 (可能是不同的 R 进程,不一定是不同的线程(, 而且,鉴于您的.export规范可能会导出必要的值, 然后,R 的词法范围可以在单独的线程/进程中搜索从foreach调用开始的缺失值。

对于您链接的特定示例, 我发现一种更简单的方法来使其工作 (至少在我的 Linux 机器上( 是执行以下操作:

library(doParallel)
cluster <- parallel::makeCluster(parallel::detectCores() -  1)
doParallel::registerDoParallel(cluster)
parallel::clusterExport(cluster, setdiff(ls(), "cluster"))
x = Factory$new(Task, time = 1, amount = 3)

..m.thread功能保留为:

..m.thread = function(object, amount, ...) {
    private$..warehouse <- foreach::foreach(1:amount) %dopar% {
        object$new(...) 
    }
}

(并在完成后手动调用stopCluster(。

clusterExport调用应具有类似于 * 的语义: 从主R进程的全局环境中获取除cluster之外的所有内容, 并使它在每个并行辅助角色的全局环境中可用。 这样,当词法范围到达各自的全局环境时,foreach调用中的任何代码都可以使用生成器。 foreach可以很聪明,可以自动导出一些变量 (如 GitHub 问题所示(, 但它有局限性, 在词汇范围期间使用的层次结构可能会变得非常混乱。

*我说"类似于"是因为我不知道如果使用分叉,R 究竟做了什么来区分(全局(环境, 但由于需要出口, 我认为它们确实是相互独立的。

PS:如果你在函数调用中创建worker,我会使用调用来on.exit(parallel::stopCluster(cluster)), 这样,您可以避免在发生错误时以某种方式停止进程为止。

最新更新