pyspark覆盖添加了sc.addpyfile



我在此路径下保存了这两个文件:

c: code sample1 main.py

def method():
    return "this is sample method 1"

c: code sample2 main.py

def method():
    return "this is sample method 2"

然后我运行此:

from pyspark import SparkContext
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession(sc)
sc.addPyFile("~/code/sample1/main.py")
main1 = __import__("main")
print(main1.method()) # this is sample method 1
sc.addPyFile("~/code/sample2/main.py") # Error

错误是

py4jjavaerror:调用o21.addfile时发生错误。。

这意味着它已经在其临时文件夹中具有" main.py"文件,并且内容不同。我想知道这种情况是否有任何解决方法,但是对我而言,我有这些限制:

  1. 文件名仍然必须为" main.py",只有文件夹才能是不同
  2. 可以以某种方式清除临时文件夹以添加aga
  3. 在另一个文件中,我拥有的唯一解决方案是附加随机字符串在main.py的前面,例如 abcdemain.py fghijmain.py ,然后我将导入main = __import __(" abcdemain"(,但这并不是真正可取的

虽然在技术上可能是可能的,但通过将spark.files.overwrite设置为 "true"

from pyspark import SparkConf, SparkContext
sc = SparkContext(conf=SparkConf().set("spark.files.overwrite", "true"))

,在简单的情况下将提供正确的结果

def f(*_):                                                                   
    from main import method
    return [method()]
sc.addFile("/path/to/sample1/main.py") 
sc.parallelize([], 3).mapPartitions(f).collect()
['this is sample method 1',
'this is sample method 1',
'this is sample method 1']
sc.addFile("/path/to/sample2/main.py")
sc.parallelize([], 3).mapPartitions(f).collect()
['this is sample method 2',
 'this is sample method 2',
 'this is sample method 2']

即使您在每个访问上进行reload模块,并且会使您的应用程序难以理解,在实践中也不可靠。由于Spark可能会隐式缓存某些对象,或者透明地重新启动Python工人,您可以轻松地在某种情况下最终出现,在这种情况下,不同的节点会看到源的不同状态。

我认为这是b/c,也许它会将所有这些文件添加到最终的平坦目录结构中?因此,确实,它试图覆盖或将2件事以同一名称将其推入同一目录。尝试重命名它们,看看会发生什么。

最新更新