在Foundry,我怎么能Hive分区只有一个parquet文件每个值?



我希望提高运行过滤逻辑的性能。要做到这一点,思路是通过将分区列设置为数据集中的一个列(称为splittable_column)来进行hive分区设置。

我检查了可分割列的基数很低,如果我从splitting_column中提取每个值,最终结果是一个800MB的拼花文件。

如果我的数据集的基数是3,我的目标是让数据布局如下:

spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet  
spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet  
spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet  

当我运行my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"])并查看结果时,我看到目录中有许多KB范围内的文件,这将在读取期间造成很大的开销。例如,我的数据集看起来像:

spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet
spark/splittable_column=Value A/part-00001-abc.c000.snappy.parquet  
spark/splittable_column=Value A/part-00002-abc.c000.snappy.parquet  
...
spark/splittable_column=Value A/part-00033-abc.c000.snappy.parquet  
spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet
...
spark/splittable_column=Value B/part-00030-def.c000.snappy.parquet
spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet
...
spark/splittable_column=Value C/part-00032-ghi.c000.snappy.parquet
etc.

从文档中我了解到:

你至少会有分区列

中的每个唯一值对应一个输出文件

我如何配置转换,我得到在Hive分区期间每个任务最多1个输出文件?

如果查看输入数据,您可能会注意到数据被分割为多个拼花文件。当您查看仅运行my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"])的构建报告时,您可能会注意到查询计划中没有shuffle。

IE,你会看到:

图:

Scan
Project
BasicStats
Execute

计划:

FoundrySaveDatasetCommand `ri.foundry.main.transaction.xxx@master`.`ri.foundry.main.dataset.yyy`, ErrorIfExists, [column1 ... 17 more fields],
+- BasicStatsNode `ri.foundry.main.transaction.zzz@master`.`ri.foundry.main.dataset.aaa`
+- Project [splitable_column ... 17 more fields]
+- Relation !ri.foundry.main.transaction.xxx:master.ri.foundry.main.dataset.yyy[splittable_column... 17 more fields] parquet

在这个例子中,它只花了1分钟运行,因为没有shuffle。

现在,如果你在列上重新分区,你将按照:

df_with_logic = df_with_logic.repartition("splittable_column")
my_output_df.write_dataframe(df_with_logic,partition_cols=["splittable_column"]

它将强制Exchange,即splittable_column上的RepartitionByExpression,这将需要更长的时间(在我的情况下为15分钟),但数据将按我想要的方式分割:

spark/splittable_column=Value A/part-00000-abc.c000.snappy.parquet  
spark/splittable_column=Value B/part-00000-def.c000.snappy.parquet  
spark/splittable_column=Value C/part-00000-ghi.c000.snappy.parquet  

图:

Scan
Exchange
Project
BasicStats
Execute

计划:

ri.foundry.main.transaction.xxx@master`.`ri.foundry.main.dataset.yyy`, ErrorIfExists, [column1 ... 17 more fields],
+- BasicStatsNode `ri.foundry.main.transaction.zzz@master`.`ri.foundry.main.dataset.aaa`
+- Project [splitable_column ... 17 more fields]
+- RepartitionByExpression [splittable_column], 1
+- Relation !ri.foundry.main.transaction.xxx:master.ri.foundry.main.dataset.yyy[splittable_column... 17 more fields] parquet

最新更新