我正在尝试编写将RDD转换为数据集的示例Apache Spark程序。但是在这个过程中,我遇到了编译时错误。
这是我的示例代码和错误:
法典:
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
object Hello {
case class Person(name: String, age: Int)
def main(args: Array[String]){
val conf = new SparkConf()
.setAppName("first example")
.setMaster("local")
val sc = new SparkContext(conf)
val peopleRDD: RDD[Person] = sc.parallelize(Seq(Person("John", 27)))
val people = peopleRDD.toDS
}
}
我的错误是:
value toDS is not a member of org.apache.spark.rdd.RDD[Person]
我添加了Spark core和Spark SQL jars。
我的版本是:
火花 1.6.2
斯卡拉 2.10
Spark 版本 <2.x
toDS
随sqlContext.implicits._
一起提供
val sqlContext = new SQLContext(sc);
import sqlContext.implicits._
val people = peopleRDD.toDS()
Spark 版本>= 2.x
val spark: SparkSession = SparkSession.builder
.config(conf)
.getOrCreate;
import spark.implicits._
val people = peopleRDD.toDS()
嗨
我可以在你的代码中看到两个错误。
首先,您必须import sqlContext.implicits._
toDS
,toDF
是在sqlContext的隐式中定义的。
其次,case class
应该在使用案例类的类范围之外定义,否则会发生task not serializable exception
完整解决方案如下
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.sql.Dataset
object Hello {
def main(args: Array[String]){
val conf = new SparkConf()
.setAppName("first example")
.setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val peopleRDD: RDD[Person] = sc.parallelize(Seq(Person("John", 27)))
val people = peopleRDD.toDS
people.show(false)
}
}
case class Person(name: String, age: Int)
确切的答案是你导入两者,
import spark.implicits._
import sqlContext.implicits._
这导致了问题,删除其中的任何 1 个,您不会遇到这样的问题