scala+spark:序列化任务以获得推荐



我一直在使用Scala+Spark和MLib教程的电影推荐。

在获得我的预测后,我需要每个用户的前3项。

val predictions = 
  model.predict(usersProducts).map { case Rating(user, product, rate) =>
  (user, product, rate)
}

我试过这个:

def myPrint(x:(Int, Int, Double)) = println(x._1 + ":" + x._2 + " - " +x._3)
predictions.collect().sortBy(- _._3).groupBy(_._1).foreach( t2 => t2._2.take(3).foreach(myPrint) )

(_.1是用户,_.2是项目,_.3是费率)

我必须添加"collect()"方法才能使其工作,但我无法序列化此任务。顺便说一句,我添加了myPrint方法,因为我不知道如何从最后一行获得集合或地图。

有什么想法可以让它序列化吗?

想从最后一行得到一个集合/地图吗?

如果我不能做得更好,在myPrint中,我会在数据库中写入,并在插入1000后提交。

谢谢。

您可以通过稍微修改您的方法来确保所有计算都在RDD中完成:

predictions.sortBy(- _.rating).groupBy(_.user)
  .flatMap(_._2.take(3)).foreach(println)

调用方法的任务必须序列化包含该方法的对象。尝试使用函数值:

val myPrint: ((Int, Int, Double)) => Unit = x => ...

您不希望在一开始就使用collect(),这会破坏使用Spark的全部意义。

我不明白你在说什么"获取收藏/地图"。.take(3)已返回集合。

在阅读了lmm的答案并进行了一些研究后,我用这种方式解决了我的问题:

首先,我开始使用Rating对象而不是Tuples:

val predictions = model.predict(usersProducts)

然后我定义函数值如下,现在我在这里做"take":

def myPrint: ((Int, Iterable[Rating])) => Unit = x => x._2.take(3).foreach(println)

所以,现在我把所有东西都这样混合:

predictions.sortBy(- _.rating).groupBy(_.user).foreach(myPrint)

最新更新