Pyspark中的大型数据帧循环-备选方案


df_hrrchy
|lefId  |Lineage                               |
|-------|--------------------------------------|
|36326  |["36326","36465","36976","36091","82"]|
|36121  |["36121","36908","36976","36091","82"]|
|36380  |["36380","36465","36976","36091","82"]|
|36448  |["36448","36465","36976","36091","82"]|
|36683  |["36683","36465","36976","36091","82"]|
|36949  |["36949","36908","36976","36091","82"]|
|37349  |["37349","36908","36976","36091","82"]|
|37026  |["37026","36908","36976","36091","82"]|
|36879  |["36879","36465","36976","36091","82"]|
df_trans
|tranID     |   T_Id                                                                  |
|-----------|-------------------------------------------------------------------------|
|1000540    |["36121","36326","37349","36949","36380","37026","36448","36683","36879"]|

df_creds
|T_Id   |T_val  |T_Goal |Parent_T_Id    |Parent_Val      |parent_Goal|
|-------|-------|-------|---------------|----------------|-----------|
|36448  |100    |1      |36465          |200             |1          |
|36465  |200    |1      |36976          |300             |2          |
|36326  |90     |1      |36465          |200             |1          |
|36091  |500    |19     |82             |600             |4          |
|36121  |90     |1      |36908          |200             |1          |
|36683  |90     |1      |36465          |200             |1          |
|36908  |200    |1      |36976          |300             |2          |
|36949  |90     |1      |36908          |200             |1          |
|36976  |300    |2      |36091          |500             |19         |
|37026  |90     |1      |36908          |200             |1          |
|37349  |100    |1      |36908          |200             |1          |
|36879  |90     |1      |36465          |200             |1          |
|36380  |90     |1      |36465          |200             |1          |

期望结果

T_Goal><1000540>>td>200<1>36976>36908><1000540><1>1000540>>>36908369081
T_idT_Val1parent__idparent_Goaltrans_id
36091[quot;36976"]50019824
36465["36448"、"36326"、"36683"、"36879"、"36280"]21000540
36908["36121"、"36949"、"37026"、"37349"]
36976["36465","36908"]300236091191000540
3668390133646511000540
3702690111000540
364481001336465
3694990136908
3632690133646511000540
3638090133646511000540
3687990133646511000540
3612190111000540
37349null10011000540

您当然需要批量处理整个DataFrame,而不是逐行迭代。

关键点是";反向";df_hrrchy,即从父谱系获得每个T_Id:的子列表

val df_children = df_hrrchy.withColumn("children", slice($"Lineage", lit(1), size($"Lineage") - 1))
.withColumn("parents", slice($"Lineage", 2, 999999))
.select(explode(arrays_zip($"children", $"parents")).as("rels"))
.distinct
.groupBy($"rels.parents".as("T_Id"))
.agg(collect_set($"rels.children").as("children"))
df_children.show(false)
+-----+-----------------------------------+
|T_Id |children                           |
+-----+-----------------------------------+
|36091|[36976]                            |
|36465|[36448, 36380, 36326, 36879, 36683]|
|36976|[36465, 36908]                     |
|82   |[36091]                            |
|36908|[36949, 37349, 36121, 37026]       |
+-----+-----------------------------------+

然后在df_trans中展开T_Ids的列表,并且还包括层次结构中的所有T_Id

val df_trans_map = df_trans.withColumn("T_Id", explode($"T_Id"))
.join(df_hrrchy, array_contains($"Lineage", $"T_Id"))
.select($"tranID", explode($"Lineage").as("T_Id"))
.distinct
df_trans_map.show(false)
+-------+-----+
|tranID |T_Id |
+-------+-----+
|1000540|36976|
|1000540|82   |
|1000540|36091|
|1000540|36465|
|1000540|36326|
|1000540|36121|
|1000540|36908|
|1000540|36380|
|1000540|36448|
|1000540|36683|
|1000540|36949|
|1000540|37349|
|1000540|37026|
|1000540|36879|
+-------+-----+

有了这个,它只是一个简单的连接,以获得最终结果:

df_trans_map.join(df_creds, Seq("T_Id"))
.join(df_children, Seq("T_Id"), "left_outer")
.show(false)
+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+
|T_Id |tranID |T_val|T_Goal|Parent_T_Id|Parent_Val|parent_Goal|children                           |
+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+
|36976|1000540|300  |2     |36091      |500       |19         |[36465, 36908]                     |
|36091|1000540|500  |19    |82         |600       |4          |[36976]                            |
|36465|1000540|200  |1     |36976      |300       |2          |[36448, 36380, 36326, 36879, 36683]|
|36326|1000540|90   |1     |36465      |200       |1          |null                               |
|36121|1000540|90   |1     |36908      |200       |1          |null                               |
|36908|1000540|200  |1     |36976      |300       |2          |[36949, 37349, 36121, 37026]       |
|36380|1000540|90   |1     |36465      |200       |1          |null                               |
|36448|1000540|100  |1     |36465      |200       |1          |null                               |
|36683|1000540|90   |1     |36465      |200       |1          |null                               |
|36949|1000540|90   |1     |36908      |200       |1          |null                               |
|37349|1000540|100  |1     |36908      |200       |1          |null                               |
|37026|1000540|90   |1     |36908      |200       |1          |null                               |
|36879|1000540|90   |1     |36465      |200       |1          |null                               |
+-----+-------+-----+------+-----------+----------+-----------+-----------------------------------+

您需要重写它才能使用完整的集群,使用localIterator意味着您没有完全利用集群进行共享工作。

下面的代码没有运行,因为您没有提供可操作的数据集进行测试如果你这样做,我会运行代码以确保它是正确的。

from pyspark.sql import functions as F
from pyspark.sql import DataFrame
from pyspark.sql.functions import explode, collect_set, expr, col, collect_list,array_contains, lit
from functools import reduce
#uses explode I know this will create a lot of short lived records but the flip side is it will use the entire cluster to complete the work instead of the driver.
df_trans_expld = df_trans.select( df_trans.tranID, explode(df_trans.T_Id).alias("T_Id") )
#uses explode
df_hrrchy_expld = df_hrrchy.select( df_hrrchy.leftId, explode( df_hrrchy.Lineage ).alias("Lineage") )
#uses exploded data to join which is the same as a filter.
df_hy_set = df_trans_expld.join( df_hrrchy_expld, df_hrrchy_expld.lefId === df_trans_expld.T_id, "left").select( "trans_id" ).agg(collect_set(col("Lineage")).alias("hierarchy_list"))
.select(F.lit(col("trans_id")).alias("trans_id "),"hierarchy_list")
#logic unchanged from here down
df_childrens = (df_creds.join(df_hy_set, expr("array_contains(hierarchy_list, T_id)"))
.select("T_id", "T_Val","T_Goal","parent_T_id", "parent_Goal", "trans _id" )
.groupBy("parent_T_id").agg(collect_list("T_id").alias("children"))
)
df_filter_creds = (df_creds.join(ddf_hy_set, expr("array_contains(hierarchy_list, T_id)"))
.select ("T_id", "T_val","T_Goal","parent_T_id", "parent_Goal”, "trans_id")
)
df_nodemap = (df_filter_creds.alias("A").join(df_childrens.alias("B"), col("A.T_id") == col("B.parent_T_id"), "left")
.select("A.T_id","B.children", "A.T_val","A.terr_Goal","A.parent_T_id", "A.parent_Goal", "A.trans_ id")
)
# no need to append/union data as it's now just one dataframe df_nodemap 

我必须对此进行更多的研究,但我很确定你是通过驱动程序(使用现有代码)提取所有数据的,这将真正减慢速度,这将利用所有执行器来完成工作。

可能还有另一种优化方法可以去掉array_contains(而是使用联接)。我必须看看解释,看看你是否可以从中获得更多的性能。别忘了,你正在避免洗牌,所以它可能会更好。

最新更新