我是Spark and Hive的新手,我的目标是将划界(假设CSV)加载到Hive Table上。经过一番阅读后,我发现将数据加载到Hive的路径是csv->dataframe->Hive
。(如果我错了,请纠正我)。
CSV:
1,Alex,70000,Columbus
2,Ryan,80000,New York
3,Johny,90000,Banglore
4,Cook, 65000,Glasgow
5,Starc, 70000,Aus
我读取CSV文件,请使用以下命令:
val csv =sc.textFile("employee_data.txt").map(line => line.split(",").map(elem => elem.trim))
csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[29] at map at <console>:39
现在,我正在尝试将此RDD转换为数据框,并使用以下代码:
scala> val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()
df: org.apache.spark.sql.DataFrame = [eid: string, name: string, salary: string, destination: string]
员工是一个案例类,我将其用作架构定义。
case class employee(eid: String, name: String, salary: String, destination: String)
但是,当我做df.show
时,我要低于错误:
org.apache.spark.sparkexception:由于阶段失败而流产的工作: 任务0阶段10.0失败了4次,最新失败:丢失任务 0.3阶段10.0(tid 22,user.hostname):scala.matcherror:[ljava.lang.string;@88ba3cb(class [ljava.lang.string;)
我期望将数据框架作为输出。我知道为什么我可能会遇到此错误,因为RDD中的值以Ljava.lang.String;@88ba3cb
格式存储,并且我需要使用mkString
来获取实际值,但是我找不到该方法。感谢您的时间。
如果修复了案例类,则应该有效:
scala> case class employee(eid: String, name: String, salary: String, destination: String)
defined class employee
scala> val txtRDD = sc.textFile("data.txt").map(line => line.split(",").map(_.trim))
txtRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[30] at map at <console>:24
scala> txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3)}.toDF.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
| 1| Alex| 70000| Columbus|
| 2| Ryan| 80000| New York|
| 3|Johny| 90000| Banglore|
| 4| Cook| 65000| Glasgow|
| 5|Starc| 70000| Aus|
+---+-----+------+-----------+
否则,您可以将String
转换为Int
:
scala> case class employee(eid: Int, name: String, salary: String, destination: String)
defined class employee
scala> val df = txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0.toInt, s1, s2, s3)}.toDF
df: org.apache.spark.sql.DataFrame = [eid: int, name: string ... 2 more fields]
scala> df.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
| 1| Alex| 70000| Columbus|
| 2| Ryan| 80000| New York|
| 3|Johny| 90000| Banglore|
| 4| Cook| 65000| Glasgow|
| 5|Starc| 70000| Aus|
+---+-----+------+-----------+
但是,最好的解决方案是使用spark-csv
(也将工资也视为Int
)。
还要注意,当您运行df.show
时,该错误是丢弃的,因为直到那时,所有内容都被懒惰地评估。df.show
是一种将导致所有排队转换执行的动作(有关更多信息,请参见本文)。
在数组元素上使用映射,而不是在数组上使用:
val csv = sc.textFile("employee_data.txt")
.map(line => line
.split(",")
.map(e => e.map(_.trim))
)
val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()
但是,为什么您正在阅读CSV然后将RDD转换为DF?Spark 1.5已经可以通过spark-csv
软件包读取CSV:
val df = sqlContext.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.option("delimiter", ";")
.load("employee_data.txt")
正如您在评论中所说的那样,您的案例类员工(应命名为Employee
)接收Int
作为其构造函数的第一个参数,但您正在通过String
。因此,在实例化或修改将eid
定义为String
的案例之前,您应该将其转换为Int
。