在读取Synapse spark数据帧时,将分区步骤作为列包括在内

我在ADLS Gen2存储中有以下分区策略

dir_parquet = "abfss://blah.windows.net/container_name/project=cars/make=*/model=*/*.parquet"






val dataFrame = sparkSession.read
.option("basePath", path)
.parquet(path + "/day=*/hour=*/platform=*/*.parquet")





import pyspark
import pyspark.sql.functions as F
from typing import List
def load_dataframes_with_partition_steps(dir_urls:List[str]) -> List[pyspark.sql.dataframe.DataFrame]:
Written by: Paul Wilson, 2022-07-29
Takes in a list of blob directories including their partition steps and returns a list of dataframes with the associated
partition steps in the in the dataframe.
Ex. input...:
...which is turned into a list of dicts...
[{'url': 'abfss://container@yourgen2store.dfs.core.windows.net/projects/cars/make=Vauxhall/model=Astra/transmission=Manual',
'make': 'Vauxhall',
'model': 'Astra',
'transmission': 'Manual'},
{'url': 'abfss://container@yourgen2store.dfs.core.windows.net/projects/cars/make=Ford/model=Fiesta/transmission=Automatic',
'make': 'Ford',
'model': 'Fiesta',
'transmission': 'Automatic'}]
...and from that list a list of dataframes per url and associated partition steps, such as:
[df1, df2, ..., dfn]
def load_dataframe(url:str=None, partition_steps:dict={}, file_format:str=None, df:pyspark.sql.dataframe.DataFrame=None) -> pyspark.sql.dataframe.DataFrame:
Recursively load a dataframe and apply the partition steps via withColumn
if file_format is None or len(file_format) == 0:
raise(ValueError('file_format must not be none, the URL must end in the file format (.parquet, .csv, etc)'))
# if there is a url and non empty partition steps without a df loaded then load the dataframe
if (url is not None and len(partition_steps.keys()) > 0 and df is None):
df = spark.read.format(file_format).load(url)
# df is loaded so do not pass a url indicating it is loaded
return load_dataframe(url=None, partition_steps=partition_steps, df=df)
# if here then the df is loaded and proceed to apply withColumn
if (url is None and df is not None and len(partition_steps.keys()) > 0):
# load the first item in the partition steps dict
key = list(partition_steps.keys())[0]
value = list(partition_steps.values())[0]
# remove the first item from the partition steps dict
# load the dataframe with the new partition step
df = df.withColumn(key, F.lit(value))
return load_dataframe(url=None, partition_steps=partition_steps, df=df)
# if it makes it here then the dataframe is loaded and the partition steps are applied
return df
# list of dataframe dict values of url and partition steps
list_df_dicts = list()
if not isinstance(dir_urls, list):
raise TypeError('dir_urls must be a list of string values')
# iterate over all urls and generate dict of partition values
for url in dir_urls:
# dict to store url and partition steps
d_dict = dict()
d_dict['url'] = url
# get the format from the last part of the url
file_format = url.split('.')[-1]
d_dict['file_format'] = file_format
# split the url keeping only partition steps (ex. make=Vauxhall)
url_split = [u for u in d_dict['url'].split('/') if '=' in u]
if len(url_split) == 0:
raise ValueError('The list of URLs must contain the partition steps, ex. make=ford')
# turn the partition=item into a key:value
partition_items = [u.split('=') for u in url_split]
# iterate over every item in partition_items=[['key', 'value']] and set dict[key] = value
for item in partition_items:
key = item[0]
value = item[1]
d_dict[key] = value
# iterate over all the dicts and load the dataframes to a list with their partition steps in place
list_dfs = list()
for d_dict in list_df_dicts:
# get the url from the d_dict
url = d_dict['url']
# get the format
file_format = d_dict['file_format']
# remove the url from the d_dict
df = load_dataframe(url=url, partition_steps=d_dict, file_format=file_format)
# return the list of dataframes
return list_dfs
