带有未传递参数的Dask延迟函数调用



当使用dask.delayed调用依赖于参数的函数时,我试图更好地理解以下行为。当在configparser读取的参数文件中指定参数时,似乎会出现此问题。下面是一个完整的例子:

参数文件:

#zpar.ini: parameter file for configparser
[my pars]
my_zpar = 2.

解析器:

#zippy_parser
import configparser
def read(_rundir):
global rundir
rundir = _rundir
cp = configparser.ConfigParser()
cp.read(rundir + '/zpar.ini')
#[my pars]
global my_zpar
my_zpar = cp['my pars'].getfloat('my_zpar')

和主python文件:

# dask test with configparser
import dask
from dask.distributed import Client
import zippy_parser as zpar

def my_func(x, y):
# print stuff
print("parameter from main is: {}".format(main_par))
print("parameter from configparser is: {}".format(zpar.my_zpar))
# do stuff
return x + y

if __name__ == '__main__':
client = Client(n_workers = 4)
#read parameters from input file
rundir = '/path/to/parameter/file'
zpar.read(rundir)
#test zpar
print("zpar is {}".format(zpar.my_zpar))
#define parameter and call my_func
main_par = 5.
z = dask.delayed(my_func)(1., 2.)
z.compute()
client.close()

my_func((中的第一个print语句执行得很好,但第二个print语句引发了一个异常。输出为:

zpar is 2.0
parameter from main is: 5.0
distributed.worker - WARNING -  Compute Failed
Function:  my_func
args:      (1.0, 2.0)
kwargs:    {}
Exception: AttributeError("module 'zippy_parser' has no attribute 'my_zpar'",)

我是dask的新手。我想这和序列化有关,我不明白。有人能启发我和/或指出相关文件吗?谢谢

我会尽量保持简短。

当一个函数被串行化以发送给工作者时,python还会发送该函数所需的局部变量和函数(它的"闭包"(。然而,它按名称存储它引用的模块,它不会试图串行化整个运行时。这意味着zippy_parser在worker中被导入,而不是取消序列化。由于从未调用过函数read在worker中,global变量永远不会初始化。

因此,您可以在worker中调用read,将其作为函数的一部分或以其他方式调用,但函数的模式或设置模块全局变量可能不太好。Dask的延迟机制更喜欢功能的纯粹性,即您得到的结果不应该取决于运行时的当前状态。

(请注意,如果您在主脚本中调用read之后创建了客户端,那么工作程序可能已经获得了内存版本,这取决于如何在系统上配置子流程(

我鼓励您将所有参数显式传递给您的dask延迟函数,而不是依赖于全局命名空间。

最新更新