尝试使用广播 ALS mllib 模型时"Task not serializable"异常



我正在尝试使用spark(1.6)流的ALS模型:

val model = MatrixFactorizationModel.load(sc, "./recommender_model/")        
...
val ssc = new StreamingContext( sc, Seconds(2) )
val stream = ssc.createKafkaStream[String, String, StringDeserializer, StringDeserializer](
                     kafkaProps,
                     List(kafkaProps.getConfig("kafka.topic"))
                     )
val broadcastModel = sc.broadcast(model)
stream.
    map( line => line._2.split(",") ).
    map( arr => {           
        val userId = arr(0).toInt
        val movieId = arr(1).toInt
        val model = broadcastModel.value
        model.predict(userId, movieId)
    }).
    print()

然而,当我尝试运行代码时,我得到以下异常:

Name: org.apache.spark.SparkException
Message: Task not serializable
StackTrace: org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
org.apache.spark.SparkContext.clean(SparkContext.scala:2059)
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
org.apache.spark.SparkContext.withScope(SparkContext.scala:719)
org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:409)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:420)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:422)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:424)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:426)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:428)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:430)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:432)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:434)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:436)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:438)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:440)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:442)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:444)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:446)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:448)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:450)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:452)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:454)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:456)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:458)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:460)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:462)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:464)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:466)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:468)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:470)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:472)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:474)
$line906.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:476)
$line906.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:478)
$line906.$read$$iwC$$iwC$$iwC.<init>(<console>:480)
$line906.$read$$iwC$$iwC.<init>(<console>:482)
$line906.$read$$iwC.<init>(<console>:484)
$line906.$read.<init>(<console>:486)
$line906.$read$.<init>(<console>:490)
$line906.$read$.<clinit>(<console>)
$line906.$eval$.<init>(<console>:7)
$line906.$eval$.<clinit>(<console>)
$line906.$eval.$print(<console>)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:95)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:55)
java.lang.reflect.Method.invoke(Method.java:507)
org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:296)
com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1$$anonfun$apply$3.apply(ScalaInterpreter.scala:291)
com.ibm.spark.global.StreamState$.withStreams(StreamState.scala:80)
com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
com.ibm.spark.interpreter.ScalaInterpreter$$anonfun$interpretAddTask$1.apply(ScalaInterpreter.scala:290)
com.ibm.spark.utils.TaskManager$$anonfun$add$2$$anon$1.run(TaskManager.scala:123)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1153)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
java.lang.Thread.run(Thread.java:785)

我应该怎么做?

我相信你们找错人了,广播在这里真的不是问题。MatrixFactorizationModel存储两个rdd:

  • userFeatures
  • productFeatures

predict同时执行lookup。因为它是一个动作,它不能从map转换执行。你必须使用transform:

stream.
  .map(_._2.split(","))
  .map {
    case Array(userId, movieId) => (userId.toInt, movieId.toInt) }      
  .transform(rdd => model.predict(rdd))

最后我是这样做的:

// load the saved model
val model = MatrixFactorizationModel.load(sc, "./recommender_model/")
// test the model
println( "Prediction for user=1, movie=500 is " + model.predict(1, 500) ) 
...
val ssc = new StreamingContext( sc, Seconds(2) )
val stream = ssc.createKafkaStream[String, String, StringDeserializer, StringDeserializer](
                     kafkaProps,
                     List(kafkaProps.getConfig("kafka.topic"))
                     )
// let's wrap the predict function with a try catch block
def predict(userId: Int, movieId: Int): Try[Any] = {
    Try(model.predict(userId, movieId))
}
val moviesToRate = stream.map(_._2.split(","))
moviesToRate.foreachRDD( rdd => {
    for(item <- rdd.collect().toArray) {
        val userId = item(0).toInt
        val movieId = item(1).toInt     
        val prediction = predict(userId, movieId).getOrElse(-1)
        // TODO: the predictions should be put on another topic to be consumed
        // by the client application (e.g. a web app)
        println(s"$userId, $movieId, $prediction")
    }
})