组合Spark数据帧中链接在一起的行



我有一个数据帧,它的行通过各种合并相互连接。到目前为止,我已经将DF转换为下面的格式,在这里我做了一个groupBy";Merge_To"并将它们收集到一个阵列中,然后将其连接回我原来的DF。看起来像这样:

df1
+---+--------+---------+
|Ref|Merge_To|   Merges|
+---+--------+---------+
|  1|      N|[3, 2, 3]|
|  2|       1|[5, 4, 6]|
|  5|       2|   [8, 7]|
| 10|      N|   [9, 9]|
| 12|      N|     [13]|
| 14|      N|     [15]|
| 16|      18|     [17]|
| 17|      16|     [19]|
| 18|      N|     [16]|
| 19|      17|     [20]|
+---+--------+---------+ 

对于参考文献1,2,5和18,16,17,19,20,它们通过一条链合并在一起。这不是我以前通过groupBy捕捉到的。最终,我希望我的DF看起来像这样,这说明了合并链:

+---+--------+------------------------+
|Ref|Merge_To|                  Merges|
+---+--------+------------------------+
|  1|      N|[3, 2, 3, 5, 4, 6, 8, 7]|
| 10|      N|                  [9, 9]|
| 12|      N|                    [13]|
| 14|      N|                    [15]|
| 18|      N|        [16, 17, 19, 20]|
+---+--------+------------------------+

我已经尝试将df1连接到过滤后的df1本身;Merge_To"不是

val arrayCombineUDF = udf((a:Seq[String], b:Seq[String]) => a ++ b )
val df1Filter = df1.filter($"Merge_To" !== "\N").
select("Merge_To", "Merges").withColumnRenamed("Merge_To", "Chain_Ref").
withColumnRenamed("Merges", "Chain_Merges")
val df2 = df1.join(df1Filter, $"Ref" === $"Chain_Ref", "left").
withColumn("Merges", when($"Chain_Merges".isNotNull, arrayCombineUDF($"Merges", $"Chain_Merges")).
otherwise($"Merges")).
select("Ref", "Merge_To", "Merges")
df2
+---+--------+----------+------------------+
|Ref|Merge_To|Merge_From|            Merges|
+---+--------+----------+------------------+
|  1|      N|         3|[3, 2, 3, 5, 4, 6]|
|  2|       1|        N|   [5, 4, 6, 8, 7]|
|  5|       2|        N|            [8, 7]|
| 10|      N|         9|            [9, 9]|
| 12|      N|        13|              [13]|
| 14|      N|        N|              [15]|
| 16|      18|        N|          [17, 19]|
| 17|      16|        N|          [19, 20]|
| 18|      N|        N|          [16, 17]|
| 19|      17|        N|              [20]|
+---+--------+----------+------------------+

这给出了我想要的结果,但实际上只占合并链的一个级别。

我还试着把和上面相同的联接过程放在一个while循环中,试图让它重复联接。我还尝试将UDF与If语句一起使用,希望我可以将每一行按合并的类型进行排序,并使用这种方法组合成一个链。

注意:我知道数组并不明显,但我不介意,最后可以对其进行排序。

编辑这是DF 的原始版本

+---+--------+----------+
|Ref|Merge_To|Merge_From|
+---+--------+----------+
|  1|      N|         3|
|  2|       1|        N|
|  3|       1|        N|
|  4|       2|        N|
|  5|       2|        N|
|  6|       2|        N|
|  7|       5|        N|
|  8|       5|        N|
|  9|      10|        N|
| 10|      N|         9|
| 11|      N|        N|
| 12|      N|        13|
| 13|      N|        N|
| 14|      N|        N|
| 15|      14|        N|
| 16|      18|        N|
| 17|      16|        N|
| 18|      N|        N|
| 19|      17|        N|
| 20|      19|        N|
+---+--------+----------+

11条目似乎擅离职守。无论如何

查看您的基本数据,这是一个分层查询,可以在具有良好功能的传统RDBMS中解决,例如CONNECT BY子句或递归WITH视图。

这个尝试你已经在1级停止了,这是问题。此外,这个问题不能很好地解决Spark的并行化分区方法。分区内容在…上任何分区都可能有与您正在查找的集相关的数据对于

  1. 建议您在那里进行处理并查询Hive表,或者使用JDBC通过Spark进行读取。

  2. 您可以按照这种没有充分记录的方法进行模拟https://sqlandhadoop.com/how-to-implement-recursive-queries-in-spark/这里,这是我有时用于BI后台处理的内容。

  3. 如果你必须在Spark领域中这样做,那么使用graphFrames方法,但运行速度相当慢,如下所示,使用一个子集的数据,并稍微改变你的方法,看看你的想法:

导入org.apache.spark.rdd.rdd

import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._  
sc.setCheckpointDir("/checkpoints")
// Subset of your data
val rdd = sc.parallelize( Array(("A", 1, None), ("B", 2, Some(1)), ("C", 3, Some(1)), ("D", 4, Some(2)), ("E", 5, Some(2)), ("F", 6, Some(2)),
("G", 7, Some(5)), ("H", 8, Some(5)), ("X", 9, Some(10)), ("Y", 10, None), ("X2", 12, Some(13)), ("Y3", 13, None)  
))
val df = rdd.toDF("v", "c", "p")
val dfV = df.select($"c".as("id"))
val dfE = df
.withColumnRenamed("c", "src")
.withColumnRenamed("p", "dst")
val nGraph = GraphFrame(dfV, dfE)
dfE.cache()
dfV.cache()
val res = nGraph.connectedComponents.run()
val res2 = res.join(df, res("id") === df("c"), "inner")
val res3 = res2.filter("p is not null").groupBy("component").agg(collect_list("id") as "group")
val res4 = res3.join(res2, res3("component") === res2("component") && res2("p").isNull , "inner")
res4.select($"id", $"group").show(false)

退货:

+---+---------------------+
|id |group                |
+---+---------------------+
|1  |[2, 3, 4, 5, 6, 7, 8]|
|10 |[9]                  |
|13 |[12]                 |
+---+---------------------+

我的建议:在RDBMS中这样做。这种方法使用graphFrames需要很长时间。

最新更新