我正在尝试使用运行时读取的架构文件将输入从文本文件转换为数据帧。我的输入文本文件如下所示:
John,23
Charles,34
架构文件如下所示:
name:string
age:integer
这是我尝试过的:
object DynamicSchema {
def main(args: Array[String]) {
val inputFile = args(0)
val schemaFile = args(1)
val schemaLines = Source.fromFile(schemaFile, "UTF-8").getLines().map(_.split(":")).map(l => l(0) -> l(1)).toMap
val spark = SparkSession.builder()
.master("local[*]")
.appName("Dynamic Schema")
.getOrCreate()
import spark.implicits._
val input = spark.sparkContext.textFile(args(0))
val schema = spark.sparkContext.broadcast(schemaLines)
val nameToType = {
Seq(IntegerType,StringType)
.map(t => t.typeName -> t).toMap
}
println(nameToType)
val fields = schema.value
.map(field => StructField(field._1, nameToType(field._2), nullable = true)).toSeq
val schemaStruct = StructType(fields)
val rowRDD = input
.map(_.split(","))
.map(attributes => Row.fromSeq(attributes))
val peopleDF = spark.createDataFrame(rowRDD, schemaStruct)
peopleDF.printSchema()
// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")
// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")
results.show()
}
}
虽然printSchema给出了所需的结果,但result.show错误出来了。我认为年龄字段实际上需要使用 toInt 进行转换。当架构仅在运行时可用时,有没有办法实现相同的效果?
替换
val input = spark.sparkContext.textFile(args(0))
跟
val input = spark.read.schema(schemaStruct).csv(args(0))
并在架构定义后移动它。