PySpark-结构中的求和值



我有一个这样的模式:

root
|-- DataColumn1: struct (nullable = true)
|    |-- colA: double (nullable = true)
|    |-- colB: struct (nullable = true)
|    |    |-- fieldA: double (nullable = true)
|    |    |-- fieldB: double (nullable = true)
|    |    |-- fieldC: double (nullable = true)
|    |-- colC: long (nullable = true)
|    |-- colD: string (nullable = true)
|-- DataColumn2: string (nullable = true)
|-- DataColumn3: string (nullable = true)

我的目标是创建一个新列,比如"DataColumn4",它是结构体"colB"中所有字段"fieldA"、"fieldB"one_answers"fieldC"(fieldA+fieldB+fieldC(的总和,该结构体位于"DataColumn1"中。

"colB"中可能有N个字段,那么在不通过Data第1.colB.fieldA、Data第1.collB.fieldB等逐个访问字段的情况下,如何对它们进行求和?

示例数据:

DataColumn1                 DataColumn2             DataColumn3
(1, (1, 2, 3), 4, 5)           XXX                      YYY
(1, (2, 3, 3), 8, 9)           XYZ                      XYX

我的预期结果必须有一个新列,它是嵌套字段的总和

DataColumn1             DataColumn2             DataColumn3.   DataColumn4
(1, (1, 2, 3), 4, 5)       XXX                      YYY.         6 (since 1+2+3 = 6)
(1, (2, 3, 3), 8, 9)       XYZ                      XYX          8 (since 2+3+3 = 8)

在没有PandasUDF的情况下,我如何在PySpark中为此编写代码?

您可以使用struct_column.struct_field访问结构内部的字段。例如-DataColumn1.colB.fieldA。并且,您可以使用DataColumn1.colB.*来选择所有结构字段。

下面是一个如何求和的例子。给定以下数据。

+--------------------+
|        c1_c2c3c4_c5|
+--------------------+
|{1, {1, 2, 3}, 4, 5}|
|{1, {2, 3, 3}, 8, 9}|
+--------------------+
root
|-- c1_c2c3c4_c5: struct (nullable = false)
|    |-- c1: long (nullable = true)
|    |-- c2c3c4: struct (nullable = false)
|    |    |-- c2: long (nullable = true)
|    |    |-- c3: long (nullable = true)
|    |    |-- c4: long (nullable = true)
|    |-- c5: long (nullable = true)
|    |-- c6: long (nullable = true)

为了得到这些字段的总和,我们需要可以使用select提取的字段。

data_sdf.select('c1_c2c3c4_c5.c2c3c4.*').columns
# ['c2', 'c3', 'c4']

实际求和码

# use reduce to add all struct fields
struct_field_sum = reduce(lambda x, y: x + y, 
[func.col('c1_c2c3c4_c5.c2c3c4.'+k) 
for k in data_sdf.select('c1_c2c3c4_c5.c2c3c4.*').columns]
)
# Column<'((c1_c2c3c4_c5.c2c3c4.c2 + c1_c2c3c4_c5.c2c3c4.c3) + c1_c2c3c4_c5.c2c3c4.c4)'>
data_sdf. 
withColumn('reqd_sum', struct_field_sum). 
show()
# +--------------------+--------+
# |        c1_c2c3c4_c5|reqd_sum|
# +--------------------+--------+
# |{1, {1, 2, 3}, 4, 5}|       6|
# |{1, {2, 3, 3}, 8, 9}|       8|
# +--------------------+--------+

要实现这一点,您必须解析StructType中的子字段,最后解析reduce,或者您也可以通过将它们投影为列表来解析sum〔原生python〕

示例

structureData = [
("James","Smith","36636","M",(3100,200)),
("Michael","Rose","40288","M",(4300,200)),
("Robert","Williams","42114","M",(1400,300)),
("Maria","Jones","39192","F",(5500,300)),
("Jen","Brown","39156","F",(5000,600))
]
structureSchema = StructType([
StructField('firstname', StringType(), True),
StructField('lastname', StringType(), True),
StructField('id', StringType(), True),
StructField('gender', StringType(), True),
StructField('salary', StructType([
StructField('component1', IntegerType(), True),
StructField('component2', IntegerType(), True)
]))
])
sparkDF = sql.createDataFrame(data=structureData,schema=structureSchema)
sparkDF.printSchema()
sparkDF.show(truncate=False)

sparkDF = sparkDF.withColumn('total_salary',reduce(lambda a, b: a + b,
[F.col(f'salary.{c}') for c in sparkDF.schema['salary'].dataType.names ]
) 
)
.withColumn('total_salary_2'
,sum(sparkDF[f'salary.{c}'] for c in sparkDF.schema['salary'].dataType.names)
) 

sparkDF.show()
+---------+--------+-----+------+-----------+------------+--------------+
|firstname|lastname|   id|gender|     salary|total_salary|total_salary_2|
+---------+--------+-----+------+-----------+------------+--------------+
|    James|   Smith|36636|     M|{3100, 200}|        3300|          3300|
|  Michael|    Rose|40288|     M|{4300, 200}|        4500|          4500|
|   Robert|Williams|42114|     M|{1400, 300}|        1700|          1700|
|    Maria|   Jones|39192|     F|{5500, 300}|        5800|          5800|
|      Jen|   Brown|39156|     F|{5000, 600}|        5600|          5600|
+---------+--------+-----+------+-----------+------------+--------------+

参考资料-

  • 在pyspark中,对列表中的不同数据帧列求和的正确方法是什么

最新更新