下面以我的一个数据集为例。这是df.printSchema()的结果
member: struct (nullable = true)
| address: struct (nullable = true)
| | city: string (nullable = true)
| | state: string (nullable = true)
| | streetAddress: string (nullable = true)
| | zipCode: string (nullable = true)
| birthDate: string (nullable = true)
| groupIdentification: string (nullable = true)
| memberCode: string (nullable = true)
| patientName: struct (nullable = true)
| | first: string (nullable = true)
| | last: string (nullable = true)
memberContractCode: string (nullable = true)
memberContractType: string (nullable = true)
memberProductCode: string (nullable = true)
这些数据是通过json读取的,我想把它压平,这样所有数据都在同一级别上,这样我的数据帧只包含基元类型,比如:
member.address.city: string (nullable = true)
member.address.state: string (nullable = true)
member.address.streetAddress: string (nullable = true)
member.address.zipCode: string (nullable = true)
member.birthDate: string (nullable = true)
member.groupIdentification: string (nullable = true)
member.memberCode: string (nullable = true)...
我知道这可以通过手动指定列名来完成,如下所示:
df = df.withColumn("member.address.city", df("member.address.city")).withColumn("member.address.state", df("member.address.state"))...
然而,我将无法为我的所有数据集硬编码如上所述的列名,因为程序需要能够在不更改实际代码的情况下动态处理新的数据集。我想做一个通用的方法,可以分解任何类型的结构,因为它已经在数据帧中,并且模式是已知的(但是完整模式的子集)。这在Spark 1.6中可能吗?如果是,如何
这应该做到了-您需要迭代模式并"压平"它,方法是将StructType
类型的字段与"简单"字段分开处理:
// helper recursive method to "flatten" the schema:
def getFields(parent: String, schema: StructType): Seq[String] = schema.fields.flatMap {
case StructField(name, t: StructType, _, _) => getFields(parent + name + ".", t)
case StructField(name, _, _, _) => Seq(s"$parent$name")
}
// apply to our DF's schema:
val fields: Seq[String] = getFields("", df.schema)
// select these fields:
val result = df.select(fields.map(name => $"$name" as name): _*)