>我正在尝试转换以下格式的边缘列表
data = [('a', 'developer'),
('b', 'tester'),
('b', 'developer'),
('c','developer'),
('c', 'architect')]
其中邻接矩阵将采用以下形式:
developer tester architect
a 1 0 0
b 1 1 0
c 1 0 1
我想以以下格式存储矩阵
1 0 0
1 1 0
1 0 1
我已经尝试过使用GraphX
def pageHash(title:String ) = title.toLowerCase.replace(" ","").hashCode.toLong
val edges: RDD[Edge[String]] = sc.textFile("/user/query.csv").map { line =>
val row = line.split(",")
Edge(pageHash(row(0)), pageHash(row(1)), "1")
}
val graph: Graph[Int, String] = Graph.fromEdges(edges, defaultValue = 1)
我能够创建图形,但无法转换为相邻的矩阵表示。
一种可能的方法是:
-
将
RDD
转换为DataFrame
val rdd = sc.parallelize(Seq( ("a", "developer"), ("b", "tester"), ("b", "developer"), ("c","developer"), ("c", "architect"))) val df = rdd.toDF("row", "col")
-
索引列:
import org.apache.spark.ml.feature.StringIndexer val indexers = Seq("row", "col").map(x => new StringIndexer().setInputCol(x).setOutputCol(s"${x}_idx").fit(df) )
-
转换数据并创建
RDD[MatrixEntry]
:import org.apache.spark.functions.lit import org.apache.spark.mllib.linalg.distributed.{MatrixEntry, CoordinateMatrix} val entries = indexers.foldLeft(df)((df, idx) => idx.transform(df)) .select($"row_idx", $"col_idx", lit(1.0)) .as[MatrixEntry] // Spark 1.6. For < 1.5 map manually .rdd
-
创建矩阵
new CoordinateMatrix(entries)
该矩阵可以进一步转换为任何其他类型的分布式矩阵,包括RowMatrix
和IndexedRowMatrix
。