我正在使用Spark 1.5.0,我有这个问题:
val df = paired_rdd.reduceByKey {
case (val1, val2) => val1 + "|" + val2
}.toDF("user_id","description")
下面是df的示例数据,正如您可以看到的列描述所示(text1#text3#weight | text1#text3#weight|....)
0.07841217886795074 book1 # author1 # | tool1 # desc1 # 0.27044260397331488 | -0.052661673730870676 song1 # album1 # | item1 # category1 #
-0.005683148395350108
我想对这个df按权重降序排序,下面是我尝试的:
首先在"|"处拆分内容然后对每个字符串在"#"处拆分得到第三个字符串也就是weight然后将其转换为double值
val getSplitAtWeight = udf((str: String) => {
str.split("|").foreach(_.split("#")(2).toDouble)
})
根据udf返回的权重值排序(降序排序)
val df_sorted = df.sort(getSplitAtWeight(col("description")).desc)
我得到以下错误:
线程"main"异常java.lang.UnsupportedOperationException:不支持Unit类型的架构org.apache.spark.sql.catalyst.ScalaReflection class.schemaFor美元(ScalaReflection.scala: 153)在org.apache.spark.sql.catalyst.ScalaReflection .schemaFor美元(ScalaReflection.scala: 29)在org.apache.spark.sql.catalyst.ScalaReflection class.schemaFor美元(ScalaReflection.scala: 64)在org.apache.spark.sql.catalyst.ScalaReflection .schemaFor美元(ScalaReflection.scala: 29)org.apache.spark.sql.functions .udf美元(functions.scala: 2242)
将udf
中的foreach
更改为map
,如下所示将消除异常:
def getSplitAtWeight = udf((str: String) => {
str.split('|').map(_.split('#')(2).toDouble)
})
你的方法的问题是List
上的foreach
方法不返回任何东西,即,它的结果是类型Unit
,这就是为什么你得到Exception
。要了解foreach
的更多信息,请查看此博客