PySpark动态创建StructType



我有一个数据看起来像这样的情况:

{"可乐":"浮动","colB":"弦"、"colC":"浮动"} 3{colC: 3.2,"colX":3.9}{"colC":"浮动","colX":"浮动"}{"colG":"val1","colH":93.2}{"colG":"字符串","colH":"浮动"}{"colG":"val4","可乐":4.2,"colJ":93.2,"投资":"val4"}{"colG":"字符串","可乐":"浮动","ColJ":"浮动","投资":"字符串"}

您可以为目标列创建一个模式字符串,并使用from_json来解析values字段。

例子
# create target schema
jsonsch = data_sdf. 
groupBy(func.lit(1).alias('dropme')). 
agg(func.array_join(func.collect_list('schema'), ',').alias('allsch')). 
withColumn('allsch', func.regexp_replace('allsch', '},{', ', ')). 
select('allsch'). 
collect()[0][0]
# "{'colA':'FLOAT', 'colB':'STRING', 'colC':'FLOAT', 'colC':'FLOAT', 'colX':'FLOAT', 'colG':'STRING', 'colH':'FLOAT', 'colG':'STRING', 'colA':'FLOAT', 'colJ':'FLOAT', 'colM':'STRING'}"
import json
jsonschema = ', '.join([k[0]+' '+k[1].lower() for k in json.loads(jsonsch.replace("'", '"')).items()])
# "colA float, colB string, colC float, colX float, colG string, colH float, colJ float, colM string"
# parse the `values` column using the target schema
data_sdf. 
withColumn('parsed_val', func.from_json('values', jsonschema)). 
selectExpr('id', 'parsed_val'). 
show(truncate=False)
# +---+------------------------------------------------+
# |id |parsed_val                                      |
# +---+------------------------------------------------+
# |2  |{3.2, val2, 3.4, null, null, null, null, null}  |
# |3  |{null, null, 3.2, null, null, null, null, 3.9}  |
# |4  |{null, null, null, val1, 93.2, null, null, null}|
# |5  |{4.2, null, null, val4, null, 93.2, val4, null} |
# +---+------------------------------------------------+
# root
#  |-- id: integer (nullable = true)
#  |-- parsed_val: struct (nullable = true)
#  |    |-- colA: float (nullable = true)
#  |    |-- colB: string (nullable = true)
#  |    |-- colC: float (nullable = true)
#  |    |-- colG: string (nullable = true)
#  |    |-- colH: float (nullable = true)
#  |    |-- colJ: float (nullable = true)
#  |    |-- colM: string (nullable = true)
#  |    |-- colX: float (nullable = true)

最新更新