我有一系列大约30个数据集,所有这些数据集都需要连接在一起才能形成一个大的最终表。最后一张表需要大约5年的时间(每年一张表(,将它们合并在一起,然后将整个历史与其他表的完整历史(类似地合并(结合起来,形成一个大的、历史性的、宽的表。
这些第一张每年表格的布局如下:
table_type_1:
| primary_key | year |
|-------------|------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
其他年份表如下:
table_type_1:
| primary_key | year |
|-------------|------|
| key_1 | 1 |
| key_2 | 1 |
然后将这些统一在一起创建:
table_type_1:
| primary_key | year |
|-------------|------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
| key_1 | 1 |
| key_2 | 1 |
类似地,第二种类型的表在统一后会产生以下结果:
table_type_2:
| primary_key | year |
|-------------|------|
| key_1 | 0 |
| key_2 | 0 |
| key_3 | 0 |
| key_1 | 1 |
| key_2 | 1 |
我现在想在primary_key
和year
上将table_type_1
与table_type_2
连接起来,以生成更宽的表。我注意到这个最后的联接需要非常长的时间,并且会打乱很多数据。
我怎样才能更快?
您可以对primary_key
和year
列上的每年表使用bucketing,将其放入完全相同数量的bucket中,以避免在计算最终联接时进行昂贵的交换。
- output: table_type_1_year_0
input: raw_table_type_1_year_0
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
- output: table_type_1_year_1
input: raw_table_type_1_year_1
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
...
- output: table_type_2_year_0
input: raw_table_type_2_year_0
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
- output: table_type_2_year_1
input: raw_table_type_2_year_1
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
...
- output: all_tables
input:
- table_type_1_year_0
- table_type_1_year_1
...
- table_type_2_year_0
- table_type_2_year_1
...
hive_partitioning: none
bucketing: BUCKET_COUNT by (PRIMARY_KEY, YEAR)
注意:当您选择BUCKET_COUNT
值时,重要的是要了解它应该针对最终的all_tables
输出进行优化,而不是针对中间表进行优化。这意味着对于中间表来说,最终可能会得到非常小的文件。与all_tables
输出的效率增益相比,这可能无关紧要,因为在将所有内容连接起来时,您不必计算大量交换;您的bucket将被预先计算,您可以简单地对输入文件执行SortMergeJoin
。
对于一个关于如何写出指定数量的bucket的转换的明确示例,我在这里的回答可能很有用。
我的建议是:在小数据集上进行第一次并集,然后广播数据集,即第一次并合的结果,spark将在其不同的节点上部署该数据集,这将减少洗牌次数。spark上的并集经过了很好的优化,所以你要做的就是考虑拥有:从一开始就只选择你需要的列,避免在并集之前进行任何非成本效益的操作,比如groupByKey。。。等等,因为spark在进行最终处理时会调用这些操作。我建议你避免使用hive,因为它使用的map reduce策略与spark sql相比是不值得的。你可以使用这个函数的例子,只需更改键,如果可以的话,使用scala,它将直接与spark交互:
def map_To_cells(df1: DataFrame, df2: DataFrame): DataFrame = {
val df0= df2.withColumn("key0",F.col("key")).drop("key")
df1.as("main").join(
broadcast(df0),
df0("key0") <=> df("key")
).select( needed columns)
}