案例类映射到csv


cat department 
dept_id,dept_name
1,acc
2,finance
3,sales
4,marketing

为什么在df.show((和rdd.toDF.show((中使用show((时输出不同。有人能帮忙吗?

scala> case class Department (dept_id: Int, dept_name: String)
defined class Department
scala> val dept = sc.textFile("/home/sam/Projects/department")
scala> val mappedDpt = dept.map(p => Department( p(0).toInt,p(1).toString))
scala> mappedDpt.toDF.show()
+-------+---------+
|dept_id|dept_name|
+-------+---------+
|     49|        ,|
|     50|        ,|
|     51|        ,|
|     52|        ,|
+-------+---------+

scala> 
val dept_df = spark.read
.format("csv")
.option("header","true")
.option("inferSchema","true")
.option("mode","permissive")
.load("/home/sam/Projects/department")
scala> dept_df.show()
+-------+---------+
|dept_id|dept_name|
+-------+---------+
|      1|      acc|
|      2|  finance|
|      3|    sales|
|      4|marketing|
+-------+---------+

scala> 

问题在这里

val mappedDpt = dept.map(p => Department( p(0).toInt,p(1).toString))

这里的p字符串,而不是(您可能会认为(。更准确地说,这里p是文本文件的每一行,您可以确认正在读取scaladoc。

">返回文本文件行的RDD"。

因此,当您应用apply方法((0)(时,您正在按行上的位置访问字符
这就是为什么第一个字符的toInt中的"49, ','"49返回字符的ascii值,而第二个字符的','返回该行。

编辑

如果您需要复制read方法,您可以执行以下操作:

object Department {
/** The Option here is to handle errors. */
def fromRawArray(data: Array[String]): Option[Department] = data match {
case Array(raw_dept_id, dept_name) => Some(Department(raw_dept_id.toInt, dept_name))
case _ => None
}
}
// We use flatMap instead of map, to unwrap the values from the Option, the Nones get removed.
val mappedDpt = dept.flatMap(line => Department.fromRawArray(line.split(",")))

不过,我希望这只是为了学习。在生产代码中,您应该始终使用read版本。因为它将更加健壮(处理丢失的值、执行更好的类型转换等(
例如,如果第一个值不能强制转换为Int,则上面的代码将抛出异常。

始终使用spark.read.*变体,因为这为您提供了数据帧,您也可以推断模式。

说到您的问题,在RDD版本中,您必须过滤第一行,然后使用逗号分隔符拆分行,然后您可以将其映射到案例类Department。

一旦您将其映射到Department,请注意您正在创建一个类型化的数据帧。。所以它是一个数据集。所以你应该使用createDataset

以下代码对我有效。

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object RDDSample {
case class Department(dept_id: Int, dept_name: String)
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder().appName("Spark_processing").master("local[*]").getOrCreate()
import spark.implicits._
val dept = spark.sparkContext.textFile("in/department.txt")
val mappedDpt = dept.filter(line => !line.contains("dept_id")).map(p => {
val y = p.split(","); Department(y(0).toInt, y(1).toString)
})
spark.createDataset(mappedDpt).show
}
}

结果:

+-------+---------+
|dept_id|dept_name|
+-------+---------+
|      1|      acc|
|      2|  finance|
|      3|    sales|
|      4|marketing|
+-------+---------+

最新更新