我有一个镶木地板按以下方式分区:
data
/batch_date=2020-01-20
/batch_date=2020-01-21
/batch_date=2020-01-22
/batch_date=2020-01-23
/batch_date=2020-01-24
这里batch_date分区列是日期类型。
我只想从最新的日期分区读取数据,但作为消费者,我不知道最新的值是什么。
我可以通过类似的东西使用一个简单的组
df.groupby().agg(max(col('batch_date'))).first()
虽然这会起作用,但这是一种非常低效的方式,因为它涉及分组。
我想知道我们是否可以以更有效的方式查询最新分区。
谢谢。
执行@pasha701建议的方法将涉及加载整个 Spark 数据帧以及所有batch_date分区,然后找到最大值。我认为作者要求一种直接找到最大分区日期并仅加载该日期的方法。 一种方法是使用 hdfs 或 s3fs,并将 s3 路径的内容作为列表加载,然后找到最大分区,然后仅加载该分区。这样会更有效率。
假设您使用的是 AWS s3 格式,如下所示:
import sys
import s3fs
datelist=[]
inpath="s3:bucket_path/data/"
fs = s3fs.S3FileSystem(anon=False)
Dirs = fs.ls(inpath)
for paths in Dirs:
date=paths.split('=')[1]
datelist.append(date)
maxpart=max(datelist)
df=spark.read.parquet("s3://bucket_path/data/batch_date=" + maxpart)
这将完成列表中的所有工作,而无需将任何内容加载到内存中,直到找到您要加载的内容。
函数"max"可以在没有"groupBy"的情况下使用:
df.select(max("batch_date"))
使用显示分区获取表的所有分区
show partitions TABLENAME
输出将像
pt=2012.07.28.08/is_complete=1
pt=2012.07.28.09/is_complete=1
我们可以使用以下查询获取数据表单特定分区
select * from TABLENAME where pt='2012.07.28.10' and is_complete='1' limit 1;
或者可以对其应用其他过滤器或分组依据。
这在 Pyspark v2.4.3 中对我有用。首先提取分区(这适用于日期列上具有单个分区的数据帧,当表具有>1 个分区时尚未尝试(:
df_partitions = spark.sql("show partitions database.dataframe")
">显示分区"返回具有名为"分区"的单列的数据帧,其值为 partitioned_col=2022-10-31。现在我们创建一个"值"列,仅将日期部分提取为字符串。然后将其转换为日期并取最大值:
date_filter = df_partitions.withColumn('value', to_date(split('partition', '=')[1], 'yyyy-MM-dd')).agg({"value":"max"}).first()[0]
date_filter 包含分区中的最大日期,可用于从同一表中提取的 where 子句。
parquet 文件应在其元数据中包含列的最大值,因此理想情况下,Spark 会利用这一点来加快查询速度。
这似乎在 Spark 3.3+ 中受支持,但目前默认禁用.
您需要启用spark.sql.parquet.aggregatePushdown
。
来自火花镶木地板文档:
如果为 true,聚合将被下推到 Parquet 进行优化。支持 MIN、MAX 和 COUNT 作为聚合表达式。对于最小值/最大值,支持布尔值、整数、浮点数和日期类型。对于 COUNT,支持所有数据类型。如果任何 Parquet 文件页脚中缺少统计信息,则会引发异常。