在Spark 3中与事实和dim表的多对多连接?



我有两个数据帧,一个有10M记录,另一个有100K记录。

假设第一个数据帧的模式

create table fact_table
(
id      string,
dim_id_list ARRAY<string>
);

其中dim_id_list是一个长度为1-200的数组。

和事实表

create table dim_table
(
id      string,
tag     string
)

其中80K+ id可以有相同的标签

示例随机数据fact_table

id |  dim_id_list
----------------------
1  | [a, b]
2  | [a, b, c]
3  | [a, b, c, d]
4  | [a, b, c, d, e]
5  | [a, b, c, d, e, f]
.... 

暗表

id |      tag
----------------------
a  |     john            
b  |     foo          
c  |     foo          
d  |     foo          
f  |     foo          
g  |     bar          
h  |     random          
i  |     spark          
.......

我要做的是向事实表添加唯一标记,如输出

所示输出:

id |  dim_id_list       |   tag
---------------------------------
1  | [a, b]             |   john
1  | [a, b]             |   foo
2  | [a, b, c]          |   john
2  | [a, b, c]          |   foo
3  | [a, b, c, d]       |   john 
3  | [a, b, c, d]       |   foo 
4  | [a, b, c, d, e]    |   john      
4  | [a, b, c, d, e]    |   foo      
4  | [a, b, c, d, e]    |   random      
5  | [a, b, c, d, e, f] |   john         
5  | [a, b, c, d, e, f] |   foo         
5  | [a, b, c, d, e, f] |   random         
5  | [a, b, c, d, e, f] |   bar   
..... 

基本上是一个左连接,但是有唯一的标签

我写的查询是

select fact_table.id, dim_id_list, tag
from fact_table
left join (select collect_list(id) as id_list,
tag
from dim_table
group by tag) as dim_table
on arrays_overlap(fact_table.dim_id_list, dim_table.id_list);

spark scala等效:

val dimDf_agg = dimDf
.groupBy("tag")
.agg(collect_set("id")
.as("id_list"))
val join_df = fact_df(broadcast(dimDf_agg),
arrays_overlap(fact_df("dim_id_list"), 
dimDf_agg("id_list")), "left")
.drop("id_list")

但是这里执行起来非常复杂,因为array_overlap的复杂度非常高。

是否有更优化的方法,我可以遵循?

像做广播连接(左)与id_listdim_id_list的哈希,只是采取第一个'foo'或其他东西?

您可以做的是爆炸数组(以便在dim_id_list中为每个项目生成一行),然后连接该值。然后可以使用distinct来删除重复项。

// simply generating the data for reproducibility
val fact_df = Seq(
1 -> Seq("a","b"),
2 -> Seq("a","b","c"),
3 -> Seq("a","b","c","d"),
4 -> Seq("a","b","c","d","e"),
5 -> Seq("a","b","c","d","e","f")
).toDF("id", "dim_id_list")
val dim_df = Seq(
"a" -> "john",            
"b" -> "foo",          
"c" -> "foo",         
"d" -> "foo",         
"f" -> "foo",         
"g" -> "bar",         
"h" -> "random",          
"i" -> "spark"  
).toDF("id", "tag")

解决方案:

fact_df
.withColumn("id_tag", explode('dim_id_list))
.join(dim_df.select('id as "id_tag", 'tag), Seq("id_tag"))
.drop("id_tag")
.distinct
.orderBy("id")
.show
+---+------------------+----+
| id|       dim_id_list| tag|
+---+------------------+----+
|  1|            [a, b]|john|
|  1|            [a, b]| foo|
|  2|         [a, b, c]|john|
|  2|         [a, b, c]| foo|
|  3|      [a, b, c, d]|john|
|  3|      [a, b, c, d]| foo|
|  4|   [a, b, c, d, e]| foo|
|  4|   [a, b, c, d, e]|john|
|  5|[a, b, c, d, e, f]|john|
|  5|[a, b, c, d, e, f]| foo|
+---+------------------+----+

相关内容

  • 没有找到相关文章

最新更新