我一直在使用Scala+Spark和MLib教程的电影推荐。
在获得我的预测后,我需要每个用户的前3项。
val predictions =
model.predict(usersProducts).map { case Rating(user, product, rate) =>
(user, product, rate)
}
我试过这个:
def myPrint(x:(Int, Int, Double)) = println(x._1 + ":" + x._2 + " - " +x._3)
predictions.collect().sortBy(- _._3).groupBy(_._1).foreach( t2 => t2._2.take(3).foreach(myPrint) )
(_.1是用户,_.2是项目,_.3是费率)
我必须添加"collect()"方法才能使其工作,但我无法序列化此任务。顺便说一句,我添加了myPrint方法,因为我不知道如何从最后一行获得集合或地图。
有什么想法可以让它序列化吗?
想从最后一行得到一个集合/地图吗?
如果我不能做得更好,在myPrint中,我会在数据库中写入,并在插入1000后提交。
谢谢。
您可以通过稍微修改您的方法来确保所有计算都在RDD中完成:
predictions.sortBy(- _.rating).groupBy(_.user)
.flatMap(_._2.take(3)).foreach(println)
调用方法的任务必须序列化包含该方法的对象。尝试使用函数值:
val myPrint: ((Int, Int, Double)) => Unit = x => ...
您不希望在一开始就使用collect()
,这会破坏使用Spark的全部意义。
我不明白你在说什么"获取收藏/地图"。.take(3)
已返回集合。
在阅读了lmm的答案并进行了一些研究后,我用这种方式解决了我的问题:
首先,我开始使用Rating对象而不是Tuples:
val predictions = model.predict(usersProducts)
然后我定义函数值如下,现在我在这里做"take":
def myPrint: ((Int, Iterable[Rating])) => Unit = x => x._2.take(3).foreach(println)
所以,现在我把所有东西都这样混合:
predictions.sortBy(- _.rating).groupBy(_.user).foreach(myPrint)