我正在使用此处提供的MovieLens数据集为电影做一个推荐系统:http://grouplens.org/datasets/movielens/
为了计算这个推荐系统,我在scala中使用了Flink的ML库,特别是ALS算法(org.apache.flink.ml.recommendation.ALS
)。
我首先将电影的评级映射到DataSet[(Int, Int, Double)]
,然后创建一个trainingSet
和一个testSet
(请参阅下面的代码)。
我的问题是,当我将ALS.fit
函数与整个数据集(所有评级)一起使用时没有错误,但是如果我只删除一个评级,fit 函数将不再起作用,我不明白为什么。
你有什么想法吗?:)
使用的代码:
Rating.scala
case class Rating(userId: Int, movieId: Int, rating: Double)
PreProcessing.scala
object PreProcessing {
def getRatings(env : ExecutionEnvironment, ratingsPath : String): DataSet[Rating] = {
env.readCsvFile[(Int, Int, Double)](
ratingsPath, ignoreFirstLine = true,
includedFields = Array(0,1,2)).map{r => new Rating(r._1, r._2, r._3)}
}
Processing.scala
object Processing {
private val ratingsPath: String = "Path_to_ratings.csv"
def main(args: Array[String]) {
val env = ExecutionEnvironment.getExecutionEnvironment
val ratings: DataSet[Rating] = PreProcessing.getRatings(env, ratingsPath)
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first(ratings.count().toInt)
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
}
}
"但如果我只删除一个评级"
val trainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.sortPartition(0, Order.ASCENDING)
.first((ratings.count()-1).toInt)
错误 :
06/19/2015 15:00:24 CoGroup (CoGroup at org.apache.flink.ml.recommendation.ALS$.updateFactors(ALS.scala:570))(4/4) 切换到 FAILED
java.lang.ArrayIndexOutOfBoundsException: 5
at org.apache.flink.ml.recommendation.ALS$BlockRating.apply(ALS.scala:358)
at org.apache.flink.ml.recommendation.ALS$$anon$111.coGroup(ALS.scala:635)
at org.apache.flink.runtime.operator.CoGroupDriver.run(CoGroupDriver.java:152)
。
问题是 first
运算符与 Flink ALS
实现的 setTemporaryPath
参数相结合。为了理解这个问题,让我快速解释一下阻塞ALS算法是如何工作的。
交替最小二乘的块实现首先将给定的评级矩阵按用户和逐项划分为块。对于这些块,将计算路由信息。此路由信息分别表示哪个用户/项目块从哪个项目/用户块接收哪个输入。之后,启动 ALS 迭代。
由于 Flink 的底层执行引擎是一个并行的流数据流引擎,它试图以流水线的方式执行尽可能多的数据流部分。这需要管道的所有操作员同时联机。这样做的好处是 Flink 避免了实现中间结果,而中间结果可能大得令人望而却步。缺点是可用内存必须在所有正在运行的运算符之间共享。在ALS的情况下,单个DataSet
元素(例如用户/项目块)的大小相当大,这是不希望的。
为了解决这个问题,如果您设置了temporaryPath
,则不会同时执行实现的所有运算符。路径定义中间结果的存储位置。因此,如果已定义临时路径,则ALS
首先计算用户块的路由信息并将其写入磁盘,然后计算项块的路由信息并将其写入磁盘,最后但并非最不重要的一点是,它会启动 ALS 迭代,从临时路径读取路由信息。
用户和项目块的路由信息的计算都取决于给定的评级数据集。在您计算用户路由信息的情况下,它将首先读取评级数据集并对其应用 first
运算符。first
运算符从基础数据集返回n
任意元素。现在的问题是 Flink 不会存储此first
操作的结果来计算项目路由信息。相反,当你开始计算项目路由信息时,Flink 将从其源重新执行数据流。这意味着它从磁盘读取评级数据集,并再次对其应用 first
运算符。在许多情况下,与第一次first
操作的结果相比,这将为您提供一组不同的评级。因此,生成的路由信息不一致,ALS
失败。
您可以通过具体化first
运算符的结果并将此结果用作ALS
算法的输入来规避该问题。对象FlinkMLTools
包含一个方法persist
,该方法采用DataSet
,将其写入给定路径,然后返回读取刚刚写入DataSet
的新DataSet
。这允许你分解生成的数据流图。
val firstTrainingSet : DataSet[(Int, Int, Double)] =
ratings
.map(r => (r.userId, r.movieId, r.rating))
.first((ratings.count()-1).toInt)
val trainingSet = FlinkMLTools.persist(firstTrainingSet, "/tmp/tmpALS/training")
val als = ALS()
.setIterations(10)
.setNumFactors(10)
.setBlocks(150)
.setTemporaryPath("/tmp/tmpALS/")
val parameters = ParameterMap()
.add(ALS.Lambda, 0.01) // After some tests, this value seems to fit the problem
.add(ALS.Seed, 42L)
als.fit(trainingSet, parameters)
或者,您可以尝试将temporaryPath
保留为未设置。然后,所有步骤(路由信息计算和 als 迭代)都以流水线方式执行。这意味着用户和物料工艺路线信息计算都使用由first
运算符生成的相同输入数据集。
Flink 社区目前正在努力将运算符的中间结果保存在内存中。这将允许固定first
运算符的结果,以便它不会被计算两次,因此不会由于其非确定性而给出不同的结果。