这是我觉得很难理解的地方:
cl = makeCluster(rep("localhost", 8), "SOCK")
# This will not work, error: dat not found in the nodes
pmult = function(cl, a, x)
{
mult = function(s) s*x
parLapply(cl, a, mult)
}
scalars = 1:4
dat = rnorm(4)
pmult(cl, scalars, dat)
# This will work
pmult = function(cl, a, x)
{
x
mult = function(s) s*x
parLapply(cl, a, mult)
}
scalars = 1:4
dat = rnorm(4)
pmult(cl, scalars, dat)
# This will work
pmult = function(cl, a, x)
{
mult = function(s, x) s*x
parLapply(cl, a, mult, x)
}
scalars = 1:4
dat = rnorm(4)
pmult(cl, scalars, dat)
第一个函数不能工作,因为对参数求值是惰性的。但什么是惰性求值?当执行mult()时,它不需要对x进行评估吗?第二个是有效的,因为它强制x求值。现在,最奇怪的事情发生在第三个函数中,除了让mult()接收x作为额外的参数外,什么也没做,突然一切都工作了!
另一件事是,如果我不想在调用parapply()的函数中定义所有变量和函数,我应该怎么做?下面的代码肯定不能工作:
pmult = function(cl)
{
source("a_x_mult.r")
parLapply(cl, a, mult, x)
}
scalars = 1:4
dat = rnorm(4)
pmult(cl, scalars, dat)
我可以传递所有这些变量和函数作为参数:
f1 = function(i)
{
return(rnorm(i))
}
f2 = function(y)
{
return(f1(y)^2)
}
f3 = function(v)
{
return(v- floor(v) + 100)
}
test = function(cl, f1, f2, f3)
{
x = f2(15)
parLapply(cl, x, f3)
}
test(cl, f1, f2, f3)
或者我可以使用clusterExport(),但是当有很多对象要导出时,它会很麻烦。有没有更好的办法?
要理解这一点,您必须认识到每个函数都有一个相关联的环境,而这个环境是什么取决于函数是如何创建的。简单地在脚本中创建的函数与全局环境相关联,但由另一个函数创建的函数与创建函数的本地环境相关联。在您的示例中,pmult
创建了mult
,因此与mult
关联的环境包含形式参数cl
、a
和x
。
第一种情况的问题是parLapply
不知道x
的任何信息:它只是一个未求值的形式参数,被parLapply
序列化为mult
环境的一部分。由于mult
被序列化并发送给集群工作程序时,x
不会被评估,因此当工作程序执行mult
时,它会导致错误,因为dat
在该上下文中不可用。换句话说,当mult
计算x
时,已经太晚了。
第二种情况有效,因为x
在mult
序列化之前求值,因此x
的实际值与mult
的环境一起序列化。如果你知道闭包,但不知道惰性参数求值,它会做你所期望的事情。
第三种情况有效,因为您让parLapply
为您处理x
。这里面根本就没有诡计。
我应该警告你,在所有这些情况下,a
正在被评估(由parLapply
)并与mult
的环境一起序列化。parLapply
还将a
分割成块并将这些块发送给每个worker,因此mult
环境中的a
副本是完全不必要的。它不会导致错误,但它可能会损害性能,因为mult
被发送给每个任务对象中的工作者。幸运的是,这在parLapply
中不是什么问题,因为每个worker只有一个任务。如果clusterApply
或clusterApplyLB
的任务数等于a
的长度,则问题会严重得多。
我在书中的"snow"一章中讨论了一些与函数和环境相关的问题。这涉及到一些微妙的问题,而且很容易受到伤害,有时甚至没有意识到它发生了。
至于你的第二个问题,有各种策略可以将函数导出到worker,但有些人确实使用source
来定义worker上的函数,而不是使用clusterExport
。请记住,source
有一个local
参数,它控制解析表达式的求值位置,您可能需要指定脚本的绝对路径。最后,如果您使用的是远程集群工作器,如果您没有分布式文件系统,您可能需要将脚本scp到工作器。
下面是一个简单的方法,可以将全局环境中的所有函数导出到集群工作器:
ex <- Filter(function(x) is.function(get(x, .GlobalEnv)), ls(.GlobalEnv))
clusterExport(cl, ex)