我遇到了一个问题,我的 pyspark 作业间歇性失败。 当我使用以下代码从目录中读取所有镶木地板文件时,它会抛出异常文件"C:\Python27\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py",第 717 行,read_int 提高 EOFError EOFError
inputPathWithData = []
if (path.exists(input)):
inputPathWithData.append(input + "\*.parquet")
if (len(inputPathWithData) > 0):
parquetFile = spark.read.parquet(*inputPathWithData)
parquetFile.createOrReplaceTempView("imp_parquetFile")
imp_df = spark.sql("SELECT * FROM imp_parquetFile as imp")
imp_df.write.json(output_path +"Single", compression="gzip")
但是,如果我单独读取文件,它都可以按预期工作
i = 0
for input_file in os.listdir(input) :
i+=1
parquetFile = spark.read.parquet(input + input_file)
parquetFile.createOrReplaceTempView("imp_parquetFile")
try:
imp_df = spark.sql("SELECT * FROM imp_parquetFile")
imp_df.write.json(output_path + '_ '+ str(i), compression="gzip")
except:
print("Issue with Filename: {0}".format(input_file))
初始化火花的代码是
spark = SparkSession
.builder
.appName("scratch_test")
.config("spark.debug.maxToStringFields", "100")
.config("spark.executor.memory", "10G")
.getOrCreate()
您不需要指定每个镶木地板文件。只需阅读整个文件夹。
df=spark.read.parquet("/folder/")