如何将DataFrame映射到EdgeRDD



我有一个类似于的DataFrame

val data = sc.parallelize(Array((1,10,10,7,7),(2,7,7,7,8),(3, 5,5,6,8))).toDF("id","col1","col2","col3","col4")

我想做的是创建一个EdgeRDD,如果两个id在列中的至少一列中共享相同的值,则它们共享一个链接

id col1 col2 col3 col4
 1   10   10    7    7
 2    7    7    7    8
 3    5    5    6    8

则节点1和2具有无向链路1-2,因为它们在col3中共享一个公共值。

出于同样的原因,节点2和3共享一个无向链路,因为它们在col4 中共享一个公共值

我知道如何以一种丑陋的方式解决这个问题(但在我的真实案例中,我有太多的专栏无法采用这种策略)

val data2 = data.withColumnRenamed("id", "idd").withColumnRenamed("col1", "col1d").withColumnRenamed("col2", "col2d").withColumnRenamed("col3", "col3d").withColumnRenamed("col4", "col4d")
val res = data.join(data2, data("id") < data2("idd")
                    && (data("col1") === data2("col1d")
                    || data("col2") === data2("col2d")
                    || data("col3") === data2("col3d")
                    || data("col4") === data2("col4d")))
                                              //> res  : org.apache.spark.sql.DataFrame = [id: int, col1: int, col2: int, col
                                              //| 3: int, col4: int, idd: int, col1d: int, col2d: int, col3d: int, col4d: int
                                              //| ]
res.show                                      //> +---+----+----+----+----+---+-----+-----+-----+-----+
                                              //| | id|col1|col2|col3|col4|idd|col1d|col2d|col3d|col4d|
                                              //| +---+----+----+----+----+---+-----+-----+-----+-----+
                                              //| |  1|  10|  10|   7|   7|  2|    7|    7|    7|    8|
                                              //| |  2|   7|   7|   7|   8|  3|    5|    5|    6|    8|
                                              //| +---+----+----+----+----+---+-----+-----+-----+-----+
                                              //| 
val links = EdgeRDD.fromEdges(res.map(row => Edge(row.getAs[Int]("id").toLong, row.getAs[Int]("idd").toLong, "indirect")))
                                              //> links  : org.apache.spark.graphx.impl.EdgeRDDImpl[String,Nothing] = EdgeRDD
                                              //| Impl[27] at RDD at EdgeRDD.scala:42
links.foreach(println)                        //> Edge(1,2,indirect)
                                              //| Edge(2,3,indirect)

如何解决更多列的问题?

你的意思是这样的吗?

val expr = data.columns.diff(Seq("id"))
  .map(c => data(c) === data2(s"${c}d"))
  .reduce(_ || _)
data.join(data2, data("id") < data2("idd") && expr)

你也可以使用别名

import org.apache.spark.sql.functions.col
val expr = data.columns.diff(Seq("id"))
  .map(c => col(s"d1.$c") === col(s"d2.$c"))
  .reduce(_ || _)
data.alias("d1").join(data.alias("d2"), col("d1.id") < col("d2.id") && expr)

您可以通过简单的select$等效于col,但需要导入sqlContext.implicits.StringToColumn)来轻松地执行这些操作

.select($"id".cast("long"), $"idd".cast("long"))

.select($"d1.id".cast("long"), $"d2.id".cast("long"))

和模式匹配:

.rdd.map { case Row(src: Long, dst: Long) => Edge(src, dst, "indirect") }

只需注意,像这样的逻辑析取不能优化,被扩展为笛卡尔乘积,然后是filter。如果你想避免,你可以尝试用不同的方式来解决这个问题。

让我们从从从宽到长的数据重塑开始:

val expr = explode(array(data.columns.tail.map(
  c => struct(lit(c).alias("column"), col(c).alias("value"))
): _*))
val long = data.withColumn("tmp", expr)
  .select($"id", $"tmp.column", $"tmp.value")

这将为我们提供一个具有以下模式的DataFrame

long.printSchema
// root
//  |-- id: integer (nullable = false)
//  |-- column: string (nullable = false)
//  |-- value: integer (nullable = false)

有了这样的数据,您可以有多种选择,包括优化的join:

val pairs = long.as("long1")
  .join(long.as("long2"),
    $"long1.column" === $"long2.column" &&  // Optimized
    $"long1.value" === $"long2.value" &&  // Optimized
    $"long1.id" < $"long2.id" // Not optimized - filtered after sort-merge join
  )
  // Select only ids
  .select($"long1.id".alias("src"), $"long2.id".alias("dst"))
  // And keep distict
  .distinct
pairs.show
// +---+---+
// |src|dst|
// +---+---+
// |  1|  2|
// |  2|  3|
// +---+---+

这可以通过使用不同的哈希技术来进一步改进,以避免爆炸生成大量记录。

您也可以将这个问题视为二分图,其中观察属于节点类别,属性值对属于另一个节点类别。

sealed trait CustomNode
case class Record(id: Long) extends CustomNode
case class Property(name: String, value: Int) extends CustomNode

以此为起点,可以使用long生成以下类型的边:

Record -> Property

并使用GraphX直接通过搜索类似的路径来解决这个问题

Record -> Property <- Record

提示:收集每个属性的邻居并传播回来。

和以前一样,您应该考虑使用哈希或bucket来减少生成的Property节点的数量限制。

最新更新