如何在 PySpark 中对大型 Spark 数据帧中的每个行子集执行映射操作



我正在使用PySpark,我想做的是:

一个大的 Spark 数据帧 df 包含所有记录。我想对这个 df 中除以"id"列的每个记录子集进行并行计算。我目前能想到的方式是这样的:(我用一个简单的例子来说明)

dicts = [
    {'id': 1,  'name': 'a',  'score':  100},
    {'id': 1,  'name': 'b',  'score':  150},
    {'id': 2,  'name': 'c',  'score':  200},
    {'id': 2,  'name': 'd',  'score':  300},
]
df = spark.createDataFrame(dicts)
from pyspark.sql.functions import (
    collect_list, 
    struct
)
# df_agg will have the following schema:   id,  a list of structs 
df_agg = df.groupBy('id').agg(
    collect_list(struct(df.columns)).alias('records')
)

但是,当我尝试做

 df_agg.rdd.map(my_func)

其中"my_func"是一些主要进行 Spark 数据帧计算的函数,我遇到了一些问题,不知道如何进行。 my_func对一行进行操作,其中一行['records']现在保存了一个结构列表。如何将此结构列表转换回 Spark 数据帧?

toDF() 不起作用。我尝试了 spark.createDataFrame(list,schema),我什至像原始 DF 使用的那样输入架构,但它仍然不起作用。

我对这些 PySpark 操作相对较新,如果您能让我知道处理此案的正确方法是什么,我将不胜感激。

谢谢!

无法评论您在尝试df_agg.rdd.map(my_func)时遇到的错误(如果您提供my_func我可以尝试的示例)。但是,您提到无法转换为数据帧,因此这是该部分的解决方案:

from pyspark.sql.types import StringType, StructField, StructType, BooleanType, ArrayType, IntegerType
schema=StructType(
               [StructField("id", IntegerType(), True), 
                StructField("records", 
                    ArrayType(StructType([StructField("id", IntegerType(), True),
                        StructField("name", StringType(), True),
                        StructField("score", IntegerType(), True)])))
               ])
df_agg.rdd.toDF(schema=schema).show(2)

相关内容

  • 没有找到相关文章

最新更新