我是spark的新手,我想实现PageRank算法。
例如,我有一个页面链接文件,上下文如下:
A B,C
B A,D
C A
D A,B
E A,B,D
我想把它转换成:
(A,List(B,C))
(B,List(A,D))
(C,List(A))
(D,List(A,B))
(E,List(A,B,D))
我不知道如何使用地图功能有人能教我怎么做吗?
import spark.implicits._
import org.apache.spark.sql.functions._
val sourceData = spark
.read
.text("/path/example.txt")
.toDF("value")
sourceData.printSchema()
// root
// |-- value: string (nullable = true)
val df1 = sourceData
.select(
split('value, " ").getItem(0).as("col1"),
split('value, " ").getItem(1).as("col2"))
df1.printSchema()
// root
// |-- col1: string (nullable = true)
// |-- col2: string (nullable = true)
df1.show(false)
// +----+-----+
// |col1|col2 |
// +----+-----+
// |A |B,C |
// |B |A,D |
// |C |A |
// |D |A,B |
// |E |A,B,D|
// +----+-----+
val df2 = df1.select('col1, split('col2, ",").as("col2"))
df2.printSchema
// root
// |-- col1: string (nullable = true)
// |-- col2: array (nullable = true)
// | |-- element: string (containsNull = true)
df2.show(false)
// +----+---------+
// |col1|col2 |
// +----+---------+
// |A |[B, C] |
// |B |[A, D] |
// |C |[A] |
// |D |[A, B] |
// |E |[A, B, D]|
// +----+---------+
V2:地图
val r1 = spark.read.text("/path/example.txt")
val r2 = r1.map(x => x.getString(0).split(" ")).map(i => (i(0), i(1).split(",")))