PySpark Pipeline.fit(df)方法给定PicklingError:无法序列化对象:ValueError



我正在处理以下数据集,这是一个Churn预测问题:https://www.kaggle.com/jpacse/telecom-churn-new-cell2cell-dataset

我正在使用pyspark,keras&Elephas使用pyspark管道建立了一个分布式神经网络模型。

当我在管道中拟合数据集时,我会得到酸洗错误。我正在使用此链接构建模型:https://github.com/aviolante/pyspark_dl_pipeline/blob/master/pyspark_dl_pipeline.ipynb

我在代码中得到错误的行是:

dl_pipeline.fit(train_data)

train_data包含两列:"features"one_answers"label"。"features"是使用VectorAssembler组装的。组装之前,所有特征都转换为浮动。"label"仅包含0和1。

以下是PicklingError:

>>> Fit model
Traceback (most recent call last):
File "/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py", line 597, in dumps
return cloudpickle.dumps(obj, 2)
File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 863, in dumps
cp.dump(obj)
File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 260, in dump
return Pickler.dump(self, obj)
File "/usr/lib/python3.6/pickle.py", line 409, in dump
self.save(obj)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 751, in save_tuple
save(element)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 406, in save_function
self.save_function_tuple(obj)
File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 549, in save_function_tuple
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 781, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.6/pickle.py", line 808, in _batch_appends
save(tmp[0])
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/local/lib/python3.6/dist-packages/pyspark/cloudpickle.py", line 657, in save_instancemethod
self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
File "/usr/lib/python3.6/pickle.py", line 610, in save_reduce
save(args)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 736, in save_tuple
save(element)
File "/usr/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 781, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.6/pickle.py", line 808, in _batch_appends
save(tmp[0])
File "/usr/lib/python3.6/pickle.py", line 521, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/python3.6/pickle.py", line 634, in save_reduce
save(state)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 821, in save_dict
self._batch_setitems(obj.items())
File "/usr/lib/python3.6/pickle.py", line 847, in _batch_setitems
save(v)
File "/usr/lib/python3.6/pickle.py", line 476, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/python3.6/pickle.py", line 781, in save_list
self._batch_appends(obj)
File "/usr/lib/python3.6/pickle.py", line 805, in _batch_appends
save(x)
File "/usr/lib/python3.6/pickle.py", line 496, in save
rv = reduce(self.proto)
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/resource_variable_ops.py", line 859, in __reduce__
name=self._shared_name,
File "/usr/local/lib/python3.6/dist-packages/tensorflow/python/ops/variables.py", line 1140, in _shared_name
return self.name[:self.name.index(":")]
ValueError: substring not found
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py in dumps(self, obj)
596         try:
--> 597             return cloudpickle.dumps(obj, 2)
598         except pickle.PickleError:
49 frames
ValueError: substring not found
During handling of the above exception, another exception occurred:
PicklingError                             Traceback (most recent call last)
/usr/local/lib/python3.6/dist-packages/pyspark/serializers.py in dumps(self, obj)
605                 msg = "Could not serialize object: %s: %s" % (e.__class__.__name__, emsg)
606             cloudpickle.print_exec(sys.stderr)
--> 607             raise pickle.PicklingError(msg)
608 
609 
PicklingError: Could not serialize object: ValueError: substring not found

如有任何指导,我们将不胜感激。非常感谢。

对我有效的解决方案在这里找到:

https://github.com/maxpumperla/elephas/issues/151

我使用以下命令降级了我的keras和tensorflow版本:

!pip install q keras==2.2.4
!pip install q tensorflow==1.14.0

酸洗错误在此之后就消失了。

这个问题在最新的1.0.0版本中也得到了解决:https://github.com/danielenricocahall/elephas/releases/tag/1.0.0由于使用了tensorflow.keras导入,而不是分别使用kerastensorflow,从而消除了不兼容性。

最新更新