Azure Synapse 中的 Apache Spark 'overwrite'方法 函数不起作用



我有一个很好的功能,它允许我在将查询结果保存到ADLS时覆盖和重命名文件,请参阅以下

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()
def rename_file(origin_path, dest_path, file_type, new_name):
filelist = mssparkutils.fs.ls(origin_path)#list all files from origin path
filtered_filelist = [x.name for x in filelist if x.name.endswith("."+file_type)]#keep names of the files that match the type requested
if len(filtered_filelist) > 1:#check if we have more than 1 files of that type
print("Too many "+file_type+" files. You will need a different implementation")
elif len(filtered_filelist) == 0: #check if there are no files of that type
print("No "+file_type+" files found")
else:
mssparkutils.fs.mv(origin_path+"/"+filtered_filelist[0], dest_path+"/"+new_name+"."+file_type)#move the file to a new path (can be the same) changing the name in the process

我通常将此函数与Databricks一起使用,其中我将使用dbutils而不是mssparkutils。

无论如何,作为一个例子,我会用下面的代码来实现上面的函数:

df_staging_ccd_probate = df_staging_ccd_probate = "abfss://root@adlspretbiukadlsdev.dfs.core.windows.net/RAW/LANDING/"
myquery.coalesce(1).write.format("parquet").mode("overwrite").save(df_staging_ccd_probate+"/tempDelta")
rename_file(df_staging_ccd_probate+"/tempDelta",df_staging_ccd_probate,"parquet","filename")
mssparkutils.fs.rm(df_staging_ccd_probate+"/tempDelta",True)

使用Databricks,这会很好,但使用带有Azure Synapse的Apache Spark,我会得到以下错误:

Py4JJavaError: An error occurred while calling z:mssparkutils.fs.mv.
: org.apache.hadoop.fs.PathExistsException: `abfss://root@adlspretbiukadlsdev.dfs.core.windows.net/RAW/LANDING/filename.parquet': File exists

出于某种原因,"overwrite"方法似乎不适用于Synapse中的Apache Spark。

有人能告诉我"覆盖"的等效方法是什么吗?还是我错过了什么?感谢

只是为了让你了解

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('ops').getOrCreate()
def rename_file(origin_path, dest_path, file_type, new_name):
filelist = dbutils.fs.ls(origin_path)#list all files from origin path
filtered_filelist = [x.name for x in filelist if x.name.endswith("."+file_type)]#keep names of the files that match the type requested
if len(filtered_filelist) > 1:#check if we have more than 1 files of that type
print("Too many "+file_type+" files. You will need a different implementation")
elif len(filtered_filelist) == 0: #check if there are no files of that type
print("No "+file_type+" files found")
else:
dbutils.fs.mv(origin_path+"/"+filtered_filelist[0], dest_path+"/"+new_name+"."+file_type)#move the file to a new path (can be the same) changing the name in the process

每次使用Databricks都会覆盖以下内容,所以Synapse一定有什么东西不起作用:

myquery.coalesce(1).write.format("parquet").mode("overwrite").save(df_staging_ccd_probate+"/tempDelta")
rename_file(df_staging_ccd_probate+"/tempDelta",df_staging_ccd_probate,"parquet","filemane")
dbutils.fs.rm(df_staging_ccd_probate+"/tempDelta",True)

你很接近。这就是如何移动文件并允许覆盖。

mssparkutils.fs.mv(source_path, dest_path, overwrite=True)

相关内容

最新更新