PySpark-添加一个新的嵌套列或更改现有嵌套列的值



假设我有一个json文件,其行的结构如下:

{
"a": 1,
"b": {
"bb1": 1,
"bb2": 2
}
}

我想更改密钥bb1的值或添加一个新密钥,如:bb3。目前,我使用spark.read.json将json文件加载到spark中作为DataFrame,并使用df.rdd.map将rdd的每一行映射到dict。然后,更改嵌套键值或添加嵌套键,将dict转换为行。最后,将RDD转换为DataFrame。工作流程如下:

def map_func(row):
dictionary = row.asDict(True)
adding new key or changing key value
return as_row(dictionary) # as_row convert dict to row recursively
df = spark.read.json("json_file")
df.rdd.map(map_func).toDF().write.json("new_json_file")

这可能对我有用。但我担心转换DataFrame->RDD(Row->dict->Row)->DataFrame会降低效率。有没有其他方法可以满足这种需求,但不以效率为代价?


我使用的最终解决方案是使用withColumn并动态构建b的模式。首先,我们可以通过从df模式中获得b_schema

b_schema = next(field['type'] for field in df.schema.jsonValue()['fields'] if field['name'] == 'b')

之后,b_schema是dict,我们可以通过在其中添加新字段

b_schema['fields'].append({"metadata":{},"type":"string","name":"bb3","nullable":True})

然后,我们可以通过将其转换为StructType

new_b = StructType.fromJson(b_schema)

在map_func中,我们可以将Row转换为dict并填充新字段:

def map_func(row):
data = row.asDict(True)
data['bb3'] = data['bb1'] + data['bb2']
return data
map_udf = udf(map_func, new_b)
df.withColumn('b', map_udf('b')).collect()

感谢@Mariusz

您可以使用map_func作为udf,从而省略对DF->RDD->DF的转换,仍然具有python实现业务逻辑的灵活性。您只需要创建模式对象:

>>> from pyspark.sql.types import *
>>> new_b = StructType([StructField('bb1', LongType()), StructField('bb2', LongType()), StructField('bb3', LongType())])

然后定义map_func和udf:

>>> from pyspark.sql.functions import *
>>> def map_func(data):
...     return {'bb1': 4, 'bb2': 5, 'bb3': 6}
... 
>>> map_udf = udf(map_func, new_b)

最后将这个UDF应用于数据帧:

>>> df = spark.read.json('sample.json')
>>> df.withColumn('b', map_udf('b')).first()
Row(a=1, b=Row(bb1=4, bb2=5, bb3=6))

编辑

根据评论:您可以用一种更简单的方式将字段添加到现有的StructType中,例如:

>>> df = spark.read.json('sample.json')
>>> new_b = df.schema['b'].dataType.add(StructField('bb3', LongType()))

最新更新