我通过另一个过程生成配对的rdd/df,但这里是生成数据集的代码,以帮助调试过程。
下面是示例i/p文件(/scratch/test2.txt):1本书1作者12书2作者21 book3 author2 3.30 下面是生成数据帧 的代码case class RefText (index: Int, description: String, fName: String, weight: Double)
val annotation_split = sc.textFile("/scratch/test2.txt").map(_.split("t"))
val annotation = annotation_split.map{line => RefText(line(0).toInt, line(1), line(2), line(3).toDouble)}.toDF()
val getConcatenated = udf( (first: String, second: String, third: Double) => { first + "#" + second + "#" + third.toString} )
val annotate_concated = annotation.withColumn("annotation",getConcatenated(col("description"), col("fName"), col("weight"))).select("index","annotation")
annotate_concated.show()
+-----+-----------------+
|index| annotation|
+-----+-----------------+
| 1|book1#author1#1.1|
| 2|book2#author2#2.2|
| 1|book3#author2#3.3|
+-----+-----------------+
//Here is how I generate pairedrdd.
val paired_rdd : PairRDDFunctions[String, String] = annotate_concated.rdd.map(row => (row.getString(0), row.getString(1)))
val df = paired_rdd.reduceByKey { case (val1, val2) => val1 + "|" + val2 }.toDF("user_id","description")
这是我的数据框架的示例数据,列描述具有以下格式(text1#text2#weight | text1#text2#weight|....)
user1book1 # author1 # 0.07841217886795074 | 1.27044260397331488 tool1 # desc1 # | song1 # album1 # -2.052661673730870676 | -0.005683148395350108 item1 # category1 #
user2book2 # author1 # 4.07841217886795074 | | song2 tool2 # desc1 # -1.27044260397331488 # album1 # 2.052661673730870676 | -0.005683148395350108第二条# category1 #
我想根据权重降序对描述列进行排序。
期望的o/p为:
user1tool1 # desc1 # 1.27044260397331488 | 0.07841217886795074 book1 # author1 # | item1 # category1 # -0.005683148395350108 | -2.052661673730870676 song1 # album1 #
user2book2 4.07841217886795074 | song2 # album1 # 2.052661673730870676 # author1 # | tool2 # desc1 # -1.27044260397331488 | -0.005683148395350108第二条# category1 #
我认为没有一种直接的方法来重新排序单元格内的值。我会亲自提前下单,即在annotation_split
rdd上下单。
这里是一个例子(我不得不改变代码一点,使其工作)。HDFS上的文件(使用正则空格和@作为分隔符):
1 book1 author1 1.10 @ 2 book2 author2 2.20 @ 1 book3 author2 3.30
:
case class RefText (index: Int, description: String, fName: String, weight: Double)
// split by line, then split line into columns
val annotation_split = sc.textFile(path).flatMap(_.split(" @ ")).map{_.split(" ")}
// HERE IS THE TRICK: sort the lines in descending order
val annotation_sorted = annotation_split
.map(line => (line.last.toFloat,line))
.sortByKey(false)
.map(_._2)
// back to your code
val annotation = annotation_sorted.map{line => RefText(line(0).toInt, line(1), line(2), line(3).toDouble)}.toDF()
val getConcatenated = udf( (first: String, second: String, third: Double) => { first + "#" + second + "#" + third.toString} )
val annotate_concated = annotation.withColumn("annotation",getConcatenated(col("description"), col("fName"), col("weight"))).select("index","annotation")
// note: here, I replaced row.getString(0) by row.getInt(0) to avoid cast exception
val paired_rdd = annotate_concated.rdd.map(row => (row.getInt(0), row.getString(1)))
val df = paired_rdd.reduceByKey { case (val1, val2) => val1 + "|" + val2 }.toDF("user_id","description")
唯一的问题是,考虑到你的并行度,排序之后可能会混淆。另一种方法是映射每一列,并以排序的方式重写它(拆分、排序、连接)。