我是scala的新手,正在尝试从元素数组中创建自定义模式,以基于新的自定义模式读取文件。
我从json文件中读取数组,并使用爆炸方法为列数组中的每个元素创建了一个数据帧。
val otherPeople = sqlContext.read.option("multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
column_values.printSchema()
获得的输出为:
column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
root
|-- column_id: string (nullable = true)
|-- data_sensitivty: string (nullable = true)
|-- datatype: string (nullable = true)
|-- length: string (nullable = true)
|-- name: string (nullable = true)
val column_name = column_values.select("name","datatype")
column_name: org.apache.spark.sql.DataFrame = [name: string, datatype: string]
column_name.show(4)
+-----------------+--------+
| name|datatype|
+-----------------+--------+
| object_number| varchar|
| function_type| varchar|
| hof_1| varchar|
| hof_2| varchar|
| region| varchar|
| country| varchar|
+-----------------+--------+
现在,对于上面列出的所有值,我希望动态创建一个val模式。
示例:
val schema = new StructType()
.add("object_number",StringType,true)
.add("function_type",StringType,true)
.add("hof_1",StringType,true)
.add("hof_2",StringType,true)
.add("region",StringType,true)
.add("Country",StringType,true)
我想在获得列数据帧后动态构建上面的结构,我读到,首先我需要为每个元素创建一个数据类型映射,然后在循环中创建一个结构。有人能帮我吗,因为我对scala的了解有限。
可以收集带有字段的DataFrame数据,并将每行字段添加到"StructType":
val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
(schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
)
def getFieldType(typeName: String): DataType = typeName match {
case "varchar" => StringType
// TODO include other types here
case _ => StringType
}
您可以遵循这种方法,它可以很好地用于您的示例:
//The schema is encoded in a string
val schemaString = "object_number function_type hof_1 hof_2 region Country"
//Generate the schema based on the string of schema
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
//Convert records of the RDD (myRdd) to Rows
val rowRDD = sc.textFile("dir").map(line => line.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2), attributes(3),attributes(4),attributes(5)))
//Apply the schema to the RDD
val perDF = spark.createDataFrame(rowRDD, schema)