在scala中创建数组元素的模式



我是scala的新手,正在尝试从元素数组中创建自定义模式,以基于新的自定义模式读取文件。

我从json文件中读取数组,并使用爆炸方法为列数组中的每个元素创建了一个数据帧。

val otherPeople = sqlContext.read.option("multiline", "true").json(otherPeopleDataset)
val column_values = otherPeople.withColumn("columns", explode($"columns")).select("columns.*")
column_values.printSchema()

获得的输出为:

column_values: org.apache.spark.sql.DataFrame = [column_id: string, data_sensitivty: string ... 3 more fields]
root
|-- column_id: string (nullable = true)
|-- data_sensitivty: string (nullable = true)
|-- datatype: string (nullable = true)
|-- length: string (nullable = true)
|-- name: string (nullable = true)
val column_name = column_values.select("name","datatype")
column_name: org.apache.spark.sql.DataFrame = [name: string, datatype: string]
column_name.show(4)

+-----------------+--------+
|             name|datatype|
+-----------------+--------+
|    object_number| varchar|
|    function_type| varchar|
|            hof_1| varchar|
|            hof_2| varchar|
|           region| varchar|
|          country| varchar|
+-----------------+--------+

现在,对于上面列出的所有值,我希望动态创建一个val模式。

示例:

val schema = new StructType()
.add("object_number",StringType,true)
.add("function_type",StringType,true)
.add("hof_1",StringType,true)
.add("hof_2",StringType,true)
.add("region",StringType,true)
.add("Country",StringType,true)

我想在获得列数据帧后动态构建上面的结构,我读到,首先我需要为每个元素创建一个数据类型映射,然后在循环中创建一个结构。有人能帮我吗,因为我对scala的了解有限。

可以收集带有字段的DataFrame数据,并将每行字段添加到"StructType":

val schemaColumns = column_name.collect()
val schema = schemaColumns.foldLeft(new StructType())(
(schema, columnRow) => schema.add(columnRow.getAs[String]("name"), getFieldType(columnRow.getAs[String]("datatype")), true)
)
def getFieldType(typeName: String): DataType = typeName match {
case "varchar" => StringType
// TODO include other types here
case _ => StringType
}

您可以遵循这种方法,它可以很好地用于您的示例:

//The schema is encoded in a string
val schemaString = "object_number function_type hof_1 hof_2 region Country"
//Generate the schema based on the string of schema
val fields = schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)
//Convert records of the RDD (myRdd) to Rows
val rowRDD = sc.textFile("dir").map(line => line.split(",")).map(attributes => Row(attributes(0),attributes(1),attributes(2), attributes(3),attributes(4),attributes(5)))
//Apply the schema to the RDD
val perDF = spark.createDataFrame(rowRDD, schema)

最新更新