Pyspark 在读取目录中的所有镶木地板文件时失败,但在单独处理文件时成功



我遇到了一个问题,我的 pyspark 作业间歇性失败。 当我使用以下代码从目录中读取所有镶木地板文件时,它会抛出异常文件"C:\Python27\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers.py",第 717 行,read_int 提高 EOFError EOFError

inputPathWithData = []
if (path.exists(input)):
inputPathWithData.append(input + "\*.parquet")
if (len(inputPathWithData) > 0):
parquetFile = spark.read.parquet(*inputPathWithData)
parquetFile.createOrReplaceTempView("imp_parquetFile")
imp_df = spark.sql("SELECT  * FROM imp_parquetFile as imp")
imp_df.write.json(output_path +"Single", compression="gzip")

但是,如果我单独读取文件,它都可以按预期工作

i = 0
for input_file in os.listdir(input) :
i+=1
parquetFile = spark.read.parquet(input + input_file)
parquetFile.createOrReplaceTempView("imp_parquetFile")
try:
imp_df = spark.sql("SELECT * FROM imp_parquetFile")
imp_df.write.json(output_path + '_ '+ str(i), compression="gzip")
except:
print("Issue with Filename: {0}".format(input_file))

初始化火花的代码是

spark = SparkSession 
.builder 
.appName("scratch_test") 
.config("spark.debug.maxToStringFields", "100") 
.config("spark.executor.memory", "10G") 
.getOrCreate()

您不需要指定每个镶木地板文件。只需阅读整个文件夹。

df=spark.read.parquet("/folder/")

最新更新