cat department
dept_id,dept_name
1,acc
2,finance
3,sales
4,marketing
为什么在df.show((和rdd.toDF.show((中使用show((时输出不同。有人能帮忙吗?
scala> case class Department (dept_id: Int, dept_name: String)
defined class Department
scala> val dept = sc.textFile("/home/sam/Projects/department")
scala> val mappedDpt = dept.map(p => Department( p(0).toInt,p(1).toString))
scala> mappedDpt.toDF.show()
+-------+---------+
|dept_id|dept_name|
+-------+---------+
| 49| ,|
| 50| ,|
| 51| ,|
| 52| ,|
+-------+---------+
scala>
val dept_df = spark.read
.format("csv")
.option("header","true")
.option("inferSchema","true")
.option("mode","permissive")
.load("/home/sam/Projects/department")
scala> dept_df.show()
+-------+---------+
|dept_id|dept_name|
+-------+---------+
| 1| acc|
| 2| finance|
| 3| sales|
| 4|marketing|
+-------+---------+
scala>
问题在这里
val mappedDpt = dept.map(p => Department( p(0).toInt,p(1).toString))
这里的p
是字符串,而不是行(您可能会认为(。更准确地说,这里p
是文本文件的每一行,您可以确认正在读取scaladoc。
">返回文本文件行的RDD"。
因此,当您应用apply
方法((0)
(时,您正在按行上的位置访问字符
这就是为什么第一个字符的toInt
中的"49, ','"
49返回字符的ascii值,而第二个字符的','
返回该行。
编辑
如果您需要复制read
方法,您可以执行以下操作:
object Department {
/** The Option here is to handle errors. */
def fromRawArray(data: Array[String]): Option[Department] = data match {
case Array(raw_dept_id, dept_name) => Some(Department(raw_dept_id.toInt, dept_name))
case _ => None
}
}
// We use flatMap instead of map, to unwrap the values from the Option, the Nones get removed.
val mappedDpt = dept.flatMap(line => Department.fromRawArray(line.split(",")))
不过,我希望这只是为了学习。在生产代码中,您应该始终使用read
版本。因为它将更加健壮(处理丢失的值、执行更好的类型转换等(
例如,如果第一个值不能强制转换为Int,则上面的代码将抛出异常。
始终使用spark.read.*变体,因为这为您提供了数据帧,您也可以推断模式。
说到您的问题,在RDD版本中,您必须过滤第一行,然后使用逗号分隔符拆分行,然后您可以将其映射到案例类Department。
一旦您将其映射到Department,请注意您正在创建一个类型化的数据帧。。所以它是一个数据集。所以你应该使用createDataset
以下代码对我有效。
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
object RDDSample {
case class Department(dept_id: Int, dept_name: String)
def main(args: Array[String]) {
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder().appName("Spark_processing").master("local[*]").getOrCreate()
import spark.implicits._
val dept = spark.sparkContext.textFile("in/department.txt")
val mappedDpt = dept.filter(line => !line.contains("dept_id")).map(p => {
val y = p.split(","); Department(y(0).toInt, y(1).toString)
})
spark.createDataset(mappedDpt).show
}
}
结果:
+-------+---------+
|dept_id|dept_name|
+-------+---------+
| 1| acc|
| 2| finance|
| 3| sales|
| 4|marketing|
+-------+---------+