我有一个Python库,我有cythonized
使用这种方法。在所有的Spark集群节点上,我已经安装了whl
文件,如下所示。
pip install myapi-0.0.1-cp38-cp38-linux_x86_64.whl
当我像下面这样向Spark独立提交作业时,代码运行良好。
spark-submit
--master spark://172.18.0.32:7077
test.py
当我通过YARN与客户端部署模式提交作业时,代码也运行良好。
spark-submit
--master yarn
--deploy-mode client
test.py
然而,当我通过YARN集群部署模式提交作业时,代码中断了。
spark-submit
--master yarn
--deploy-mode cluster
test.py
特别是,我得到这个错误。
泡菜。PicklingError: Can't pickle
:属性查找lambda在myapi.
代码myapi.utils.Data
没有什么特别的,看起来像这样。
class Data:
def __init__(self, rdd):
self.rdd = rdd
def compute(self):
return self.rdd.map(change_it).reduce(lambda a, b: a + b)
def change_it(n):
a = lambda v: v
b = lambda v: v
c = lambda v: v
d = lambda v: v
e = lambda v: v
f = lambda v: a(b(c(d(e(v)))))
return f(n)
有几个地方讨论了pickle (web和SO)嵌套函数与PySpark,pickle
,cloudpickle
和cythonized模块的困难。然而,答案似乎不是blanket
的答案,这将解释为什么它会在一种情况下起作用,而不是在其他情况下,因为我已经在上面进行了实验。
如果能进一步解释我为什么会得到这些观察结果,我将不胜感激。
我的Spark环境设置如下:
- 火花v3.3.1 Hadoop v3.2.1Python v3.8
我对Spark不是很熟悉,但我希望"客户端"mode运行在同一个Python进程中,因此不需要序列化任何东西(对象可以简单地创建和使用)。"cluster"Mode大概是设计为在一堆不同的计算机上运行,因此需要将数据传递给pickle和unpickle的单独进程以分发它。
我相信cloudpickle
和dill
可以查看常规Python函数和lambda的内部,提取字节码,然后重建函数。这在Cython中显然是不可能的。在当前版本的Cython中,函数是按名称进行pickle的(因此lambda不可pickle,任何内部函数也不可pickle)。
你的选择是:
-
用可pickleable类(使用
__call__
函数)重写所有捕获变量的函数或lambda;用在函数或类作用域中定义的def
函数替换其他lambda -
尝试这个实验分支,它使大部分Cython函数可pickle。