RDD[Array[String]] to Dataframe



我是Spark and Hive的新手,我的目标是将划界(假设CSV)加载到Hive Table上。经过一番阅读后,我发现将数据加载到Hive的路径是csv->dataframe->Hive。(如果我错了,请纠正我)。

CSV:
1,Alex,70000,Columbus
2,Ryan,80000,New York
3,Johny,90000,Banglore
4,Cook, 65000,Glasgow
5,Starc, 70000,Aus

我读取CSV文件,请使用以下命令:

val csv =sc.textFile("employee_data.txt").map(line => line.split(",").map(elem => elem.trim))
csv: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[29] at map at <console>:39

现在,我正在尝试将此RDD转换为数据框,并使用以下代码:

scala> val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()
df: org.apache.spark.sql.DataFrame = [eid: string, name: string, salary: string, destination: string]

员工是一个案例类,我将其用作架构定义。

case class employee(eid: String, name: String, salary: String, destination: String)

但是,当我做df.show时,我要低于错误:

org.apache.spark.sparkexception:由于阶段失败而流产的工作: 任务0阶段10.0失败了4次,最新失败:丢失任务 0.3阶段10.0(tid 22,user.hostname):scala.matcherror:[ljava.lang.string;@88ba3cb(class [ljava.lang.string;)

我期望将数据框架作为输出。我知道为什么我可能会遇到此错误,因为RDD中的值以Ljava.lang.String;@88ba3cb格式存储,并且我需要使用mkString来获取实际值,但是我找不到该方法。感谢您的时间。

如果修复了案例类,则应该有效:

scala> case class employee(eid: String, name: String, salary: String, destination: String)
defined class employee
scala> val txtRDD = sc.textFile("data.txt").map(line => line.split(",").map(_.trim))
txtRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[30] at map at <console>:24
scala> txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3)}.toDF.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
|  1| Alex| 70000|   Columbus|
|  2| Ryan| 80000|   New York|
|  3|Johny| 90000|   Banglore|
|  4| Cook| 65000|    Glasgow|
|  5|Starc| 70000|        Aus|
+---+-----+------+-----------+

否则,您可以将String转换为Int

scala> case class employee(eid: Int, name: String, salary: String, destination: String)
defined class employee
scala> val df = txtRDD.map{case Array(s0, s1, s2, s3) => employee(s0.toInt, s1, s2, s3)}.toDF
df: org.apache.spark.sql.DataFrame = [eid: int, name: string ... 2 more fields]
scala> df.show
+---+-----+------+-----------+
|eid| name|salary|destination|
+---+-----+------+-----------+
|  1| Alex| 70000|   Columbus|
|  2| Ryan| 80000|   New York|
|  3|Johny| 90000|   Banglore|
|  4| Cook| 65000|    Glasgow|
|  5|Starc| 70000|        Aus|
+---+-----+------+-----------+

但是,最好的解决方案是使用spark-csv(也将工资也视为Int)。

还要注意,当您运行df.show时,该错误是丢弃的,因为直到那时,所有内容都被懒惰地评估。df.show是一种将导致所有排队转换执行的动作(有关更多信息,请参见本文)。

在数组元素上使用映射,而不是在数组上使用:

val csv = sc.textFile("employee_data.txt")
    .map(line => line
                     .split(",")
                     .map(e => e.map(_.trim))
     )
val df = csv.map { case Array(s0, s1, s2, s3) => employee(s0, s1, s2, s3) }.toDF()

但是,为什么您正在阅读CSV然后将RDD转换为DF?Spark 1.5已经可以通过spark-csv软件包读取CSV:

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") 
    .option("inferSchema", "true") 
    .option("delimiter", ";") 
    .load("employee_data.txt")

正如您在评论中所说的那样,您的案例类员工(应命名为Employee)接收Int作为其构造函数的第一个参数,但您正在通过String。因此,在实例化或修改将eid定义为String的案例之前,您应该将其转换为Int

相关内容

  • 没有找到相关文章

最新更新