我有一个数据帧,它的行通过各种合并相互连接。到目前为止,我已经将DF转换为下面的格式,在这里我做了一个groupBy";Merge_To"并将它们收集到一个阵列中,然后将其连接回我原来的DF。看起来像这样:
df1
+---+--------+---------+
|Ref|Merge_To| Merges|
+---+--------+---------+
| 1| N|[3, 2, 3]|
| 2| 1|[5, 4, 6]|
| 5| 2| [8, 7]|
| 10| N| [9, 9]|
| 12| N| [13]|
| 14| N| [15]|
| 16| 18| [17]|
| 17| 16| [19]|
| 18| N| [16]|
| 19| 17| [20]|
+---+--------+---------+
对于参考文献1,2,5和18,16,17,19,20,它们通过一条链合并在一起。这不是我以前通过groupBy捕捉到的。最终,我希望我的DF看起来像这样,这说明了合并链:
+---+--------+------------------------+
|Ref|Merge_To| Merges|
+---+--------+------------------------+
| 1| N|[3, 2, 3, 5, 4, 6, 8, 7]|
| 10| N| [9, 9]|
| 12| N| [13]|
| 14| N| [15]|
| 18| N| [16, 17, 19, 20]|
+---+--------+------------------------+
我已经尝试将df1连接到过滤后的df1本身;Merge_To"不是
val arrayCombineUDF = udf((a:Seq[String], b:Seq[String]) => a ++ b )
val df1Filter = df1.filter($"Merge_To" !== "\N").
select("Merge_To", "Merges").withColumnRenamed("Merge_To", "Chain_Ref").
withColumnRenamed("Merges", "Chain_Merges")
val df2 = df1.join(df1Filter, $"Ref" === $"Chain_Ref", "left").
withColumn("Merges", when($"Chain_Merges".isNotNull, arrayCombineUDF($"Merges", $"Chain_Merges")).
otherwise($"Merges")).
select("Ref", "Merge_To", "Merges")
df2
+---+--------+----------+------------------+
|Ref|Merge_To|Merge_From| Merges|
+---+--------+----------+------------------+
| 1| N| 3|[3, 2, 3, 5, 4, 6]|
| 2| 1| N| [5, 4, 6, 8, 7]|
| 5| 2| N| [8, 7]|
| 10| N| 9| [9, 9]|
| 12| N| 13| [13]|
| 14| N| N| [15]|
| 16| 18| N| [17, 19]|
| 17| 16| N| [19, 20]|
| 18| N| N| [16, 17]|
| 19| 17| N| [20]|
+---+--------+----------+------------------+
这给出了我想要的结果,但实际上只占合并链的一个级别。
我还试着把和上面相同的联接过程放在一个while循环中,试图让它重复联接。我还尝试将UDF与If语句一起使用,希望我可以将每一行按合并的类型进行排序,并使用这种方法组合成一个链。
注意:我知道数组并不明显,但我不介意,最后可以对其进行排序。
编辑这是DF 的原始版本
+---+--------+----------+
|Ref|Merge_To|Merge_From|
+---+--------+----------+
| 1| N| 3|
| 2| 1| N|
| 3| 1| N|
| 4| 2| N|
| 5| 2| N|
| 6| 2| N|
| 7| 5| N|
| 8| 5| N|
| 9| 10| N|
| 10| N| 9|
| 11| N| N|
| 12| N| 13|
| 13| N| N|
| 14| N| N|
| 15| 14| N|
| 16| 18| N|
| 17| 16| N|
| 18| N| N|
| 19| 17| N|
| 20| 19| N|
+---+--------+----------+
11条目似乎擅离职守。无论如何
查看您的基本数据,这是一个分层查询,可以在具有良好功能的传统RDBMS中解决,例如CONNECT BY子句或递归WITH视图。
这个尝试你已经在1级停止了,这是问题。此外,这个问题不能很好地解决Spark的并行化分区方法。分区内容在…上任何分区都可能有与您正在查找的集相关的数据对于
-
建议您在那里进行处理并查询Hive表,或者使用JDBC通过Spark进行读取。
-
您可以按照这种没有充分记录的方法进行模拟https://sqlandhadoop.com/how-to-implement-recursive-queries-in-spark/这里,这是我有时用于BI后台处理的内容。
-
如果你必须在Spark领域中这样做,那么使用graphFrames方法,但运行速度相当慢,如下所示,使用一个子集的数据,并稍微改变你的方法,看看你的想法:
导入org.apache.spark.rdd.rdd
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.graphframes._
sc.setCheckpointDir("/checkpoints")
// Subset of your data
val rdd = sc.parallelize( Array(("A", 1, None), ("B", 2, Some(1)), ("C", 3, Some(1)), ("D", 4, Some(2)), ("E", 5, Some(2)), ("F", 6, Some(2)),
("G", 7, Some(5)), ("H", 8, Some(5)), ("X", 9, Some(10)), ("Y", 10, None), ("X2", 12, Some(13)), ("Y3", 13, None)
))
val df = rdd.toDF("v", "c", "p")
val dfV = df.select($"c".as("id"))
val dfE = df
.withColumnRenamed("c", "src")
.withColumnRenamed("p", "dst")
val nGraph = GraphFrame(dfV, dfE)
dfE.cache()
dfV.cache()
val res = nGraph.connectedComponents.run()
val res2 = res.join(df, res("id") === df("c"), "inner")
val res3 = res2.filter("p is not null").groupBy("component").agg(collect_list("id") as "group")
val res4 = res3.join(res2, res3("component") === res2("component") && res2("p").isNull , "inner")
res4.select($"id", $"group").show(false)
退货:
+---+---------------------+
|id |group |
+---+---------------------+
|1 |[2, 3, 4, 5, 6, 7, 8]|
|10 |[9] |
|13 |[12] |
+---+---------------------+
我的建议:在RDBMS中这样做。这种方法使用graphFrames需要很长时间。