将二分图转换为邻接矩阵 Spark Scala



>我正在尝试转换以下格式的边缘列表

data = [('a', 'developer'),
     ('b', 'tester'),
    ('b', 'developer'),
     ('c','developer'),
     ('c', 'architect')]

其中邻接矩阵将采用以下形式:

      developer     tester    architect
 a        1            0          0
 b        1            1          0
 c        1            0          1

我想以以下格式存储矩阵

 1    0    0
 1    1    0
 1    0    1

我已经尝试过使用GraphX

def pageHash(title:String )  = title.toLowerCase.replace(" ","").hashCode.toLong

val edges: RDD[Edge[String]] = sc.textFile("/user/query.csv").map { line => 
  val row = line.split(",") 
  Edge(pageHash(row(0)), pageHash(row(1)), "1") 
} 
val graph: Graph[Int, String] = Graph.fromEdges(edges, defaultValue = 1)

我能够创建图形,但无法转换为相邻的矩阵表示。

一种可能的方法是:

  1. RDD转换为DataFrame

    val rdd = sc.parallelize(Seq(
      ("a", "developer"), ("b", "tester"), ("b", "developer"),
      ("c","developer"), ("c", "architect")))
    val df = rdd.toDF("row", "col")
    
  2. 索引列:

    import org.apache.spark.ml.feature.StringIndexer
    val indexers = Seq("row", "col").map(x =>
      new StringIndexer().setInputCol(x).setOutputCol(s"${x}_idx").fit(df)
    )
    
  3. 转换数据并创建RDD[MatrixEntry]

    import org.apache.spark.functions.lit
    import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix}
    
    val entries = indexers.foldLeft(df)((df, idx) => idx.transform(df))
      .select($"row_idx", $"col_idx", lit(1.0))
      .as[MatrixEntry]  // Spark 1.6. For < 1.5 map manually
      .rdd
    
  4. 创建矩阵

    new CoordinateMatrix(entries)
    

该矩阵可以进一步转换为任何其他类型的分布式矩阵,包括RowMatrixIndexedRowMatrix

最新更新