我正在尝试使用Kryo注册在spark中运行一个非常简单的scala类。这个类只是将文件中的数据加载到RDD[LabeledPoint]
中。
代码(灵感来自https://spark.apache.org/docs/latest/mllib-decision-tree.html):
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
object test {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local").setAppName("test")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.kryo.registrationRequired", "true")
val sc = new SparkContext(conf)
sc.getConf.registerKryoClasses(classOf[ org.apache.spark.mllib.regression.LabeledPoint ])
sc.getConf.registerKryoClasses(classOf[ org.apache.spark.rdd.RDD[org.apache.spark.mllib.regression.LabeledPoint] ])
// Load data
val rawData = sc.textFile("data/mllib/sample_tree_data.csv")
val data = rawData.map { line =>
val parts = line.split(',').map(_.toDouble)
LabeledPoint(parts(0), Vectors.dense(parts.tail))
}
sc.stop()
System.exit(0)
}
}
我的理解是,由于我已经设置了spark.kryo.registrationRequired = true
,所有使用的类都必须注册,因此我已经注册了RDD[LabeledPoint]
和LabeledPoint
。
问题
我收到以下错误:
java.lang.IllegalArgumentException: Class is not registered: org.apache.spark.mllib.regression.LabeledPoint[]
Note: To register this class use: kryo.register(org.apache.spark.mllib.regression.LabeledPoint[].class);
at com.esotericsoftware.kryo.Kryo.getRegistration(Kryo.java:442)
at com.esotericsoftware.kryo.util.DefaultClassResolver.writeClass(DefaultClassResolver.java:79)
at com.esotericsoftware.kryo.Kryo.writeClass(Kryo.java:472)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:565)
at org.apache.spark.serializer.KryoSerializerInstance.serialize(KryoSerializer.scala:162)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
据我所知,这意味着类LabeledPoint[]
没有注册,而我已经注册了类LabeledPoint
。
此外,在错误中提出的注册类(kryo.register(org.apache.spark.mllib.regression.LabeledPoint[].class);
)的代码不起作用。
- 这两个班有什么区别
- 我该如何注册这门课
非常感谢@eliasah,他指出所提出的解决方案(kryo.register(org.apache.spark.mllib.regression.LabeledPoint[].class);
)在Java
中,而不是在Scala中。
因此,LabeledPoint[]
在Scala中的含义就是Array[LabeledPoint]
。
我通过注册Array[LabeledPoint]
类解决了我的问题,即在我的代码中添加:
sc.getConf.registerKryoClasses(classOf[ Array[org.apache.spark.mllib.regression.LabeledPoint] ])