假设一个数据框有两列:C1和C2
+---+-----+
|C1 | C2 |
+---+-----+
|A | B |
|C | D |
|A | E |
|E | F |
+---+-----+
我的目标是:收集到在数组交叉点
+--------------+
| intersections|
+--------------+
|[A, B, E, F] |
|[C, D] |
+--------------+
如果数据帧有大量的行(~ 10亿),如何做得好
解决方案是GraphFrame库(https://graphframes.github.io/graphframes/docs/_site/index.html)
免责声明:测试Spark 2.4.4和GraphFrame 0.7.0
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import scala.collection._
import org.graphframes.GraphFrame
object SparkApp extends App {
val appName = "appName"
val master = "local[*]"
val spark = SparkSession
.builder
.appName(appName)
.master(master)
.getOrCreate
import spark.implicits._
val dataTest =
Seq(
("A", "B"),
("C", "D"),
("A", "E"),
("E", "F")
).toDF("C1", "C2")
// it's mandatory for GraphFrame
spark.sparkContext.setCheckpointDir("/some/path/hdfs/test_checkpoints")
// dataframe to list of vertices and connections list
val graphTest: GraphFrame =
GraphFrame(
dataTest.select('C1 as "id").union(dataTest.select('C2 as "id")).distinct,
dataTest.withColumnRenamed("C1", "src").withColumnRenamed("C2","dst")
)
val graphComponentsTest = graphTest.connectedComponents.run()
val clustersResultTestDF =
graphComponentsTest
.groupBy("component")
.agg(collect_list("id") as "intersections")
clustersResultTestDF.show
}
输出+--------------+
| intersections|
+--------------+
|[A, B, E, F] |
|[C, D] |
+--------------+