Apache Spark:收集到一个数组的交集



假设一个数据框有两列:C1和C2

+---+-----+
|C1 | C2  |
+---+-----+
|A  |  B  |
|C  |  D  |
|A  |  E  |
|E  |  F  |
+---+-----+

我的目标是:收集到在数组交叉点

+--------------+
| intersections|
+--------------+
|[A, B, E, F]  |
|[C, D]        |
+--------------+

如果数据帧有大量的行(~ 10亿),如何做得好

解决方案是GraphFrame库(https://graphframes.github.io/graphframes/docs/_site/index.html)

免责声明:测试Spark 2.4.4和GraphFrame 0.7.0

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import scala.collection._
import org.graphframes.GraphFrame
object SparkApp extends App {
val appName = "appName"
val master = "local[*]"

val spark = SparkSession
.builder
.appName(appName)
.master(master)
.getOrCreate

import spark.implicits._
val dataTest =
Seq(
("A", "B"),
("C", "D"),
("A", "E"),
("E", "F")
).toDF("C1", "C2")
// it's mandatory for GraphFrame
spark.sparkContext.setCheckpointDir("/some/path/hdfs/test_checkpoints")
// dataframe to list of vertices and connections list
val graphTest: GraphFrame = 
GraphFrame(
dataTest.select('C1 as "id").union(dataTest.select('C2 as "id")).distinct, 
dataTest.withColumnRenamed("C1", "src").withColumnRenamed("C2","dst")
)
val graphComponentsTest = graphTest.connectedComponents.run()
val clustersResultTestDF = 
graphComponentsTest
.groupBy("component")
.agg(collect_list("id") as "intersections")

clustersResultTestDF.show
}

输出
+--------------+
| intersections|
+--------------+
|[A, B, E, F]  |
|[C, D]        |
+--------------+

相关内容

  • 没有找到相关文章

最新更新