在 Spark 中广播用户定义的类



我正在尝试在 PySpark 应用程序中广播用户定义的变量,但我总是遇到以下错误:

File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 174, in main
process()
File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 169, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 268, in dump_stream
vs = list(itertools.islice(iterator, batch))
File "/home/.../sparkbroad.py", line 29, in <lambda>
output = input_.map(lambda item: b.value.map(item))
File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/broadcast.py", line 106, in value
self._value = self.load(self._path)
File "/usr/local/spark-2.1.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/broadcast.py", line 97, in load
return pickle.load(f)
AttributeError: 'module' object has no attribute 'FooMap'

模块sparkbrad.py中的代码如下:

import random
import pyspark as spark
class FooMap(object):
def __init__(self):
keys = list(range(10))
values = [2 * key for key in keys]
self._map = dict(zip(keys, values))
def map(self, value):
if value not in self._map:
return -1
return self._map[value]

class FooMapJob(object):
def __init__(self, inputs):
self._inputs = inputs
self._foomap = FooMap()
def run(self):
sc = spark.SparkContext('local', 'FooMap')
input_ = sc.parallelize(self._inputs, 4)
b = sc.broadcast(self._foomap)
output = input_.map(lambda item: b.value.map(item))
b.unpersist()
result = list(output.toLocalIterator())
sc.stop()
return result

def main():
inputs = [random.randint(0, 10) for _ in range(10)]
job = FooMapJob(inputs)
print(job.run())
if __name__ == '__main__':
main()

我通过以下方式运行它:

:~$ spark-submit --master local[4] --py-files sparkbroad.py sparkbroad.py

我添加了--py-files参数,但看起来变化不大。不幸的是,我在网上找不到任何处理复杂类广播(只是列表或词典)的例子。任何提示都值得赞赏。提前谢谢。

更新:将FooMap类放在一个单独的模块中,即使没有--py-files指令,一切似乎都运行良好。

FooMap类放在一个单独的模块中,一切正常。

这是对上一个答案的补充。

您应该从另一个文件导入FooMap,而不仅仅是在当前文件中定义它

也许是这样的: 在foo_map.py:

class FooMap(object):
def __init__(self):
keys = list(range(10))
values = [2 * key for key in keys]
self._map = dict(zip(keys, values))
def map(self, value):
if value not in self._map:
return -1
return self._map[value]

然后在 sparkbrad.py

from foo_map import FooMap
class FooMapJob(object):
def __init__(self, inputs):
self._inputs = inputs
self._foomap = FooMap()
def run(self):
sc = spark.SparkContext('local', 'FooMap')
input_ = sc.parallelize(self._inputs, 4)
b = sc.broadcast(self._foomap)
output = input_.map(lambda item: b.value.map(item))
b.unpersist()
result = list(output.toLocalIterator())
sc.stop()
return result

def main():
inputs = [random.randint(0, 10) for _ in range(10)]
job = FooMapJob(inputs)
print(job.run())
if __name__ == '__main__':
main()

最新更新