如何使我的多联接/多联合数据集计算速度更快



我有一系列大约30个数据集,所有这些数据集都需要连接在一起才能形成一个大的最终表。最后一张表需要大约5年的时间(每年一张表(,将它们合并在一起,然后将整个历史与其他表的完整历史(类似地合并(结合起来,形成一个大的、历史性的、宽的表。

这些第一张每年表格的布局如下:

table_type_1:
| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |

其他年份表如下:

table_type_1:
| primary_key | year |
|-------------|------|
| key_1       | 1    |
| key_2       | 1    |

然后将这些统一在一起创建:

table_type_1:
| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |
| key_1       | 1    |
| key_2       | 1    |

类似地,第二种类型的表在统一后会产生以下结果:

table_type_2:
| primary_key | year |
|-------------|------|
| key_1       | 0    |
| key_2       | 0    |
| key_3       | 0    |
| key_1       | 1    |
| key_2       | 1    |

我现在想在primary_keyyear上将table_type_1table_type_2连接起来,以生成更宽的表。我注意到这个最后的联接需要非常长的时间,并且会打乱很多数据。

我怎样才能更快?

您可以对primary_keyyear列上的每年表使用bucketing,将其放入完全相同数量的bucket中,以避免在计算最终联接时进行昂贵的交换。

- output: table_type_1_year_0
input: raw_table_type_1_year_0
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
- output: table_type_1_year_1
input: raw_table_type_1_year_1
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
...
- output: table_type_2_year_0
input: raw_table_type_2_year_0
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
- output: table_type_2_year_1
input: raw_table_type_2_year_1
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
...
- output: all_tables
input:
- table_type_1_year_0
- table_type_1_year_1
...
- table_type_2_year_0
- table_type_2_year_1
...
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)

注意:当您选择BUCKET_COUNT值时,重要的是要了解它应该针对最终的all_tables输出进行优化,而不是针对中间表进行优化。这意味着对于中间表来说,最终可能会得到非常小的文件。与all_tables输出的效率增益相比,这可能无关紧要,因为在将所有内容连接起来时,您不必计算大量交换;您的bucket将被预先计算,您可以简单地对输入文件执行SortMergeJoin

对于一个关于如何写出指定数量的bucket的转换的明确示例,我在这里的回答可能很有用。

我的建议是:在小数据集上进行第一次并集,然后广播数据集,即第一次并合的结果,spark将在其不同的节点上部署该数据集,这将减少洗牌次数。spark上的并集经过了很好的优化,所以你要做的就是考虑拥有:从一开始就只选择你需要的列,避免在并集之前进行任何非成本效益的操作,比如groupByKey。。。等等,因为spark在进行最终处理时会调用这些操作。我建议你避免使用hive,因为它使用的map reduce策略与spark sql相比是不值得的。你可以使用这个函数的例子,只需更改键,如果可以的话,使用scala,它将直接与spark交互:

def map_To_cells(df1: DataFrame, df2: DataFrame): DataFrame = {
val df0= df2.withColumn("key0",F.col("key")).drop("key")
df1.as("main").join(
broadcast(df0),
df0("key0") <=> df("key")
).select( needed columns)
}  

最新更新