Spark数据帧-编码器



我是Scala和Spark的新手。

我正在尝试使用编码器从Spark中读取一个文件,然后转换为java/scala对象。

读取文件的第一步是应用模式并使用as进行编码。

然后,我使用该数据集/数据帧来执行简单的映射操作,但如果我尝试在生成的数据集或数据帧上打印模式,它不会打印任何列。

此外,当我第一次读取该文件时,我没有在Person类中映射年龄字段,只是在map函数中计算它以进行尝试——但我根本没有看到使用Person将年龄映射到数据帧。

Person.txt中的数据:

firstName,lastName,dob
ABC, XYZ, 01/01/2019
CDE, FGH, 01/02/2020

以下是代码:

object EncoderExample extends App {
val sparkSession = SparkSession.builder().appName("EncoderExample").master("local").getOrCreate();
case class Person(firstName: String, lastName: String, dob: String,var age: Int = 10)
implicit val encoder = Encoders.bean[Person](classOf[Person])
val personDf = sparkSession.read.option("header","true").option("inferSchema","true").csv("Person.txt").as(encoder)
personDf.printSchema()
personDf.show()
val calAge = personDf.map(p => {
p.age = Year.now().getValue - p.dob.substring(6).toInt
println(p.age)
p
} )//.toDF()//.as(encoder)
print("*********Person DF Schema after age calculation: ")
calAge.printSchema()
//calAge.show
}
package spark
import java.text.SimpleDateFormat
import java.util.Calendar
import org.apache.spark.sql.{SparkSession}
import org.apache.spark.sql.functions._
case class Person(firstName: String, lastName: String, dob: String, age: Long)
object CalcAge extends App {
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
val sourceDF = Seq(
("ABC", "XYZ", "01/01/2019"),
("CDE", "FGH", "01/02/2020")
).toDF("firstName","lastName","dob")
sourceDF.printSchema
//  root
//  |-- firstName: string (nullable = true)
//  |-- lastName: string (nullable = true)
//  |-- dob: string (nullable = true)
sourceDF.show(false)
//  +---------+--------+----------+
//  |firstName|lastName|dob       |
//  +---------+--------+----------+
//  |ABC      |XYZ     |01/01/2019|
//  |CDE      |FGH     |01/02/2020|
//  +---------+--------+----------+

def getCurrentYear: Long = {
val today:java.util.Date = Calendar.getInstance.getTime
val timeFormat = new SimpleDateFormat("yyyy")
timeFormat.format(today).toLong
}
val ageUDF = udf((d1: String) => {
val year = d1.split("/").reverse.head.toLong
val yearNow = getCurrentYear
yearNow - year
})

val df = sourceDF
.withColumn("age", ageUDF('dob))
df.printSchema
//  root
//  |-- firstName: string (nullable = true)
//  |-- lastName: string (nullable = true)
//  |-- dob: string (nullable = true)
//  |-- age: long (nullable = false)
df.show(false)
//  +---------+--------+----------+---+
//  |firstName|lastName|dob       |age|
//  +---------+--------+----------+---+
//  |ABC      |XYZ     |01/01/2019|1  |
//  |CDE      |FGH     |01/02/2020|0  |
//  +---------+--------+----------+---+
val person = df.as[Person].collectAsList()
//  person: java.util.List[Person] = [Person(ABC,XYZ,01/01/2019,1), Person(CDE,FGH,01/02/2020,0)]
println(person)

}

最新更新