Spark 查找日期分区列的最大值



我有一个镶木地板按以下方式分区:

data
/batch_date=2020-01-20
/batch_date=2020-01-21
/batch_date=2020-01-22
/batch_date=2020-01-23
/batch_date=2020-01-24

这里batch_date分区列是日期类型。

我只想从最新的日期分区读取数据,但作为消费者,我不知道最新的值是什么。

我可以通过类似的东西使用一个简单的组

df.groupby().agg(max(col('batch_date'))).first()

虽然这会起作用,但这是一种非常低效的方式,因为它涉及分组。

我想知道我们是否可以以更有效的方式查询最新分区。

谢谢。

执行@pasha701建议的方法将涉及加载整个 Spark 数据帧以及所有batch_date分区,然后找到最大值。我认为作者要求一种直接找到最大分区日期并仅加载该日期的方法。 一种方法是使用 hdfs 或 s3fs,并将 s3 路径的内容作为列表加载,然后找到最大分区,然后仅加载该分区。这样会更有效率。

假设您使用的是 AWS s3 格式,如下所示:

import sys
import s3fs
datelist=[]
inpath="s3:bucket_path/data/"
fs = s3fs.S3FileSystem(anon=False)
Dirs = fs.ls(inpath)
for paths in Dirs:
date=paths.split('=')[1]
datelist.append(date)
maxpart=max(datelist)
df=spark.read.parquet("s3://bucket_path/data/batch_date=" + maxpart)

这将完成列表中的所有工作,而无需将任何内容加载到内存中,直到找到您要加载的内容。

函数"max"可以在没有"groupBy"的情况下使用:

df.select(max("batch_date"))

使用显示分区获取表的所有分区

show partitions TABLENAME

输出将像

pt=2012.07.28.08/is_complete=1
pt=2012.07.28.09/is_complete=1

我们可以使用以下查询获取数据表单特定分区

select * from TABLENAME where pt='2012.07.28.10' and is_complete='1' limit 1;

或者可以对其应用其他过滤器或分组依据。

这在 Pyspark v2.4.3 中对我有用。首先提取分区(这适用于日期列上具有单个分区的数据帧,当表具有>1 个分区时尚未尝试(:

df_partitions = spark.sql("show partitions database.dataframe")
">

显示分区"返回具有名为"分区"的单列的数据帧,其值为 partitioned_col=2022-10-31。现在我们创建一个"值"列,仅将日期部分提取为字符串。然后将其转换为日期并取最大值:

date_filter = df_partitions.withColumn('value', to_date(split('partition', '=')[1], 'yyyy-MM-dd')).agg({"value":"max"}).first()[0]

date_filter 包含分区中的最大日期,可用于从同一表中提取的 where 子句。

parquet 文件应在其元数据中包含列的最大值,因此理想情况下,Spark 会利用这一点来加快查询速度。

这似乎在 Spark 3.3+ 中受支持,但目前默认禁用.
您需要启用spark.sql.parquet.aggregatePushdown

来自火花镶木地板文档:

如果为 true,聚合将被下推到 Parquet 进行优化。支持 MIN、MAX 和 COUNT 作为聚合表达式。对于最小值/最大值,支持布尔值、整数、浮点数和日期类型。对于 COUNT,支持所有数据类型。如果任何 Parquet 文件页脚中缺少统计信息,则会引发异常。

最新更新