我有两个数据帧,一个有10M记录,另一个有100K记录。
假设第一个数据帧的模式
create table fact_table
(
id string,
dim_id_list ARRAY<string>
);
其中dim_id_list
是一个长度为1-200
的数组。
和事实表
create table dim_table
(
id string,
tag string
)
其中80K+ id可以有相同的标签
示例随机数据fact_table
id | dim_id_list
----------------------
1 | [a, b]
2 | [a, b, c]
3 | [a, b, c, d]
4 | [a, b, c, d, e]
5 | [a, b, c, d, e, f]
....
暗表
id | tag
----------------------
a | john
b | foo
c | foo
d | foo
f | foo
g | bar
h | random
i | spark
.......
我要做的是向事实表添加唯一标记,如输出
所示输出:
id | dim_id_list | tag
---------------------------------
1 | [a, b] | john
1 | [a, b] | foo
2 | [a, b, c] | john
2 | [a, b, c] | foo
3 | [a, b, c, d] | john
3 | [a, b, c, d] | foo
4 | [a, b, c, d, e] | john
4 | [a, b, c, d, e] | foo
4 | [a, b, c, d, e] | random
5 | [a, b, c, d, e, f] | john
5 | [a, b, c, d, e, f] | foo
5 | [a, b, c, d, e, f] | random
5 | [a, b, c, d, e, f] | bar
.....
基本上是一个左连接,但是有唯一的标签
我写的查询是
select fact_table.id, dim_id_list, tag
from fact_table
left join (select collect_list(id) as id_list,
tag
from dim_table
group by tag) as dim_table
on arrays_overlap(fact_table.dim_id_list, dim_table.id_list);
spark scala等效:
val dimDf_agg = dimDf
.groupBy("tag")
.agg(collect_set("id")
.as("id_list"))
val join_df = fact_df(broadcast(dimDf_agg),
arrays_overlap(fact_df("dim_id_list"),
dimDf_agg("id_list")), "left")
.drop("id_list")
但是这里执行起来非常复杂,因为array_overlap
的复杂度非常高。
是否有更优化的方法,我可以遵循?
像做广播连接(左)与id_list
和dim_id_list
的哈希,只是采取第一个'foo'或其他东西?
您可以做的是爆炸数组(以便在dim_id_list
中为每个项目生成一行),然后连接该值。然后可以使用distinct来删除重复项。
// simply generating the data for reproducibility
val fact_df = Seq(
1 -> Seq("a","b"),
2 -> Seq("a","b","c"),
3 -> Seq("a","b","c","d"),
4 -> Seq("a","b","c","d","e"),
5 -> Seq("a","b","c","d","e","f")
).toDF("id", "dim_id_list")
val dim_df = Seq(
"a" -> "john",
"b" -> "foo",
"c" -> "foo",
"d" -> "foo",
"f" -> "foo",
"g" -> "bar",
"h" -> "random",
"i" -> "spark"
).toDF("id", "tag")
解决方案:
fact_df
.withColumn("id_tag", explode('dim_id_list))
.join(dim_df.select('id as "id_tag", 'tag), Seq("id_tag"))
.drop("id_tag")
.distinct
.orderBy("id")
.show
+---+------------------+----+
| id| dim_id_list| tag|
+---+------------------+----+
| 1| [a, b]|john|
| 1| [a, b]| foo|
| 2| [a, b, c]|john|
| 2| [a, b, c]| foo|
| 3| [a, b, c, d]|john|
| 3| [a, b, c, d]| foo|
| 4| [a, b, c, d, e]| foo|
| 4| [a, b, c, d, e]|john|
| 5|[a, b, c, d, e, f]|john|
| 5|[a, b, c, d, e, f]| foo|
+---+------------------+----+