我希望提高运行过滤逻辑的性能。要做到这一点,思路是通过将分区列设置为数据集中的一个列(称为splittable_column
)来进行hive分区设置。
我检查了可分割列的基数很低,如果我从splitting_column
中提取每个值,最终结果是一个800MB的拼花文件。
如果我的数据集的基数是3,我的目标是让数据布局如下:
spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet
spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet
spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet
当我运行my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"])
并查看结果时,我看到目录中有许多KB范围内的文件,这将在读取期间造成很大的开销。例如,我的数据集看起来像:
spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet
spark/splittable_column=Value A/part-00001-abc.c000.snappy.parquet
spark/splittable_column=Value A/part-00002-abc.c000.snappy.parquet
...
spark/splittable_column=Value A/part-00033-abc.c000.snappy.parquet
spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet
...
spark/splittable_column=Value B/part-00030-def.c000.snappy.parquet
spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet
...
spark/splittable_column=Value C/part-00032-ghi.c000.snappy.parquet
etc.
从文档中我了解到:
你至少会有分区列
中的每个唯一值对应一个输出文件
我如何配置转换,我得到在Hive分区期间每个任务最多1个输出文件?
如果查看输入数据,您可能会注意到数据被分割为多个拼花文件。当您查看仅运行my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"])
的构建报告时,您可能会注意到查询计划中没有shuffle。
IE,你会看到:
图:
Scan
Project
BasicStats
Execute
计划:
FoundrySaveDatasetCommand `ri.foundry.main.transaction.xxx@master`.`ri.foundry.main.dataset.yyy`, ErrorIfExists, [column1 ... 17 more fields],
+- BasicStatsNode `ri.foundry.main.transaction.zzz@master`.`ri.foundry.main.dataset.aaa`
+- Project [splitable_column ... 17 more fields]
+- Relation !ri.foundry.main.transaction.xxx:master.ri.foundry.main.dataset.yyy[splittable_column... 17 more fields] parquet
在这个例子中,它只花了1分钟运行,因为没有shuffle。
现在,如果你在列上重新分区,你将按照:
df_with_logic = df_with_logic.repartition("splittable_column")
my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"]
它将强制Exchange
,即splittable_column
上的RepartitionByExpression
,这将需要更长的时间(在我的情况下为15分钟),但数据将按我想要的方式分割:
spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet
spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet
spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet
图:
Scan
Exchange
Project
BasicStats
Execute
计划:
ri.foundry.main.transaction.xxx@master`.`ri.foundry.main.dataset.yyy`, ErrorIfExists, [column1 ... 17 more fields],
+- BasicStatsNode `ri.foundry.main.transaction.zzz@master`.`ri.foundry.main.dataset.aaa`
+- Project [splitable_column ... 17 more fields]
+- RepartitionByExpression [splittable_column], 1
+- Relation !ri.foundry.main.transaction.xxx:master.ri.foundry.main.dataset.yyy[splittable_column... 17 more fields] parquet