在读取Synapse spark数据帧时,将分区步骤作为列包括在内



我在ADLS Gen2存储中有以下分区策略

dir_parquet = "abfss://blah.windows.net/container_name/project=cars/make=*/model=*/*.parquet"

这将相应地将已经分区的数据加载到数据帧中。我知道在SQL中使用.filepath(n)来实现这一点,并且实际上需要同样的东西,但在笔记本数据帧中。

如何将数据帧中的projectmakemodel值保留为单独的列?

根据这个其他SO线程,在读取时设置.option("mergeSchema","true")将起作用,但它没有起作用。

谢谢。

您是否尝试为调用添加基本路径:

val dataFrame = sparkSession.read
.option("basePath", path)
.parquet(path + "/day=*/hour=*/platform=*/*.parquet")

如果我不添加基本路径,我就不会有天/小时/平台的列。我同意这个选择。

这篇文章解释了为什么没有它就不能工作,但实际上忘记了你可以手动添加它。

由于我没有收到任何答案,也找不到官方的方法,所以我编写了以下代码。

有这个问题的人可能也会发现递归返回blob目录很有用,如果是这样,请参阅这里的deep_ls函数(而不是我的代码(。

import pyspark
import pyspark.sql.functions as F
from typing import List
def load_dataframes_with_partition_steps(dir_urls:List[str]) -> List[pyspark.sql.dataframe.DataFrame]:
"""
Written by: Paul Wilson, 2022-07-29
Takes in a list of blob directories including their partition steps and returns a list of dataframes with the associated
partition steps in the in the dataframe.
Ex. input...:
['abfss://container@yourgen2store.dfs.core.windows.net/projects/cars/make=Vauxhall/model=Astra/transmission=Manual',
'abfss://container@yourgen2store.dfs.core.windows.net/projects/cars/make=Ford/model=Fiesta/transmission=Automatic']
...which is turned into a list of dicts...
[{'url': 'abfss://container@yourgen2store.dfs.core.windows.net/projects/cars/make=Vauxhall/model=Astra/transmission=Manual',
'make': 'Vauxhall',
'model': 'Astra',
'transmission': 'Manual'},
{'url': 'abfss://container@yourgen2store.dfs.core.windows.net/projects/cars/make=Ford/model=Fiesta/transmission=Automatic',
'make': 'Ford',
'model': 'Fiesta',
'transmission': 'Automatic'}]
...and from that list a list of dataframes per url and associated partition steps, such as:
[df1, df2, ..., dfn]
"""
def load_dataframe(url:str=None, partition_steps:dict={}, file_format:str=None, df:pyspark.sql.dataframe.DataFrame=None) -> pyspark.sql.dataframe.DataFrame:
"""
Recursively load a dataframe and apply the partition steps via withColumn
"""
if file_format is None or len(file_format) == 0:
raise(ValueError('file_format must not be none, the URL must end in the file format (.parquet, .csv, etc)'))
# if there is a url and non empty partition steps without a df loaded then load the dataframe
if (url is not None and len(partition_steps.keys()) > 0 and df is None):
df = spark.read.format(file_format).load(url)
# df is loaded so do not pass a url indicating it is loaded
return load_dataframe(url=None, partition_steps=partition_steps, df=df)
# if here then the df is loaded and proceed to apply withColumn
if (url is None and df is not None and len(partition_steps.keys()) > 0):
# load the first item in the partition steps dict
key = list(partition_steps.keys())[0]
value = list(partition_steps.values())[0]
# remove the first item from the partition steps dict
partition_steps.pop(key)
# load the dataframe with the new partition step
df = df.withColumn(key, F.lit(value))
return load_dataframe(url=None, partition_steps=partition_steps, df=df)
# if it makes it here then the dataframe is loaded and the partition steps are applied
return df
# list of dataframe dict values of url and partition steps
list_df_dicts = list()
if not isinstance(dir_urls, list):
raise TypeError('dir_urls must be a list of string values')
# iterate over all urls and generate dict of partition values
for url in dir_urls:
# dict to store url and partition steps
d_dict = dict()
d_dict['url'] = url
# get the format from the last part of the url
file_format = url.split('.')[-1]
d_dict['file_format'] = file_format
# split the url keeping only partition steps (ex. make=Vauxhall)
url_split = [u for u in d_dict['url'].split('/') if '=' in u]
if len(url_split) == 0:
raise ValueError('The list of URLs must contain the partition steps, ex. make=ford')
# turn the partition=item into a key:value
partition_items = [u.split('=') for u in url_split]
# iterate over every item in partition_items=[['key', 'value']] and set dict[key] = value
for item in partition_items:
key = item[0]
value = item[1]
d_dict[key] = value
list_df_dicts.append(d_dict)
# iterate over all the dicts and load the dataframes to a list with their partition steps in place
list_dfs = list()
for d_dict in list_df_dicts:
# get the url from the d_dict
url = d_dict['url']
# get the format
file_format = d_dict['file_format']
# remove the url from the d_dict
d_dict.pop('url')
df = load_dataframe(url=url, partition_steps=d_dict, file_format=file_format)
list_dfs.append(df)
# return the list of dataframes
return list_dfs

最新更新