我在此路径下保存了这两个文件:
c: code sample1 main.py
def method():
return "this is sample method 1"
c: code sample2 main.py
def method():
return "this is sample method 2"
然后我运行此:
from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession(sc)
sc.addPyFile("~/code/sample1/main.py")
main1 = __import__("main")
print(main1.method()) # this is sample method 1
sc.addPyFile("~/code/sample2/main.py") # Error
错误是
py4jjavaerror:调用o21.addfile时发生错误。。
这意味着它已经在其临时文件夹中具有" main.py"文件,并且内容不同。我想知道这种情况是否有任何解决方法,但是对我而言,我有这些限制:
- 文件名仍然必须为" main.py",只有文件夹才能是不同
- 可以以某种方式清除临时文件夹以添加aga
- 在另一个文件中,我拥有的唯一解决方案是附加随机字符串在main.py的前面,例如 abcdemain.py 和 fghijmain.py ,然后我将导入main = __import __(" abcdemain"(,但这并不是真正可取的
虽然在技术上可能是可能的,但通过将spark.files.overwrite
设置为 "true"
:
from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().set("spark.files.overwrite", "true"))
,在简单的情况下将提供正确的结果
def f(*_):
from main import method
return [method()]
sc.addFile("/path/to/sample1/main.py")
sc.parallelize([], 3).mapPartitions(f).collect()
['this is sample method 1',
'this is sample method 1',
'this is sample method 1']
sc.addFile("/path/to/sample2/main.py")
sc.parallelize([], 3).mapPartitions(f).collect()
['this is sample method 2',
'this is sample method 2',
'this is sample method 2']
即使您在每个访问上进行reload
模块,并且会使您的应用程序难以理解,在实践中也不可靠。由于Spark可能会隐式缓存某些对象,或者透明地重新启动Python工人,您可以轻松地在某种情况下最终出现,在这种情况下,不同的节点会看到源的不同状态。
我认为这是b/c,也许它会将所有这些文件添加到最终的平坦目录结构中?因此,确实,它试图覆盖或将2件事以同一名称将其推入同一目录。尝试重命名它们,看看会发生什么。