如何生成,然后减少,从每一行的一个DataFrame在PySpark大量的数据集?



不幸的是,我不能分享我的实际代码或数据,因为它是专有的,但如果读者从文本中不清楚问题,我可以生成MWE。

我正在处理一个包含~ 5000万行的数据框,每一行包含一个大的XML文档。从每个XML文档中,我提取了一组与出现次数和标记之间的层次关系相关的统计信息(没有什么比未记录的XML格式更让人高兴的了)。我可以用数据框表示这些统计信息,并且可以使用GROUP BY/SUM和DISTINCT等标准操作将这些数据框组合到多个文档上。目标是提取所有5000万个文档的统计信息,并在单个数据帧中表示它们。

问题是我不知道如何在Spark中从一个数据框的每一行有效地生成5000万数据框,或者如何告诉Spark使用二进制运算符将5000万数据框的列表减少到一个数据框。有做这些事情的标准函数吗?

到目前为止,我发现的唯一解决方法效率非常低(将数据存储为字符串,对其进行解析,进行计算,然后将其转换回字符串)。用这种方法要花几个星期才能完成,所以不实用。

来自每行的每个XML响应的提取和统计数据可以存储在该行本身的其他列中。这样,spark应该能够在多个执行器中执行进程,从而提高性能。这是一个伪代码。

from pyspark.sql.types import StructType, StructField, IntegerType, 
StringType, DateType, FloatType, ArrayType
def extract_metrics_from_xml(row):
j = row['xmlResponse'] # assuming your xml column name is xmlResponse
# perform your xml extractions and computations for the xmlResponse in python
...
load_date = ...
stats_data1 = ...

return Row(load_date, stats_data1, stats_data2, stats_group)

schema = schema = StructType([StructField('load_date', DateType()),
StructField('stats_data1', FloatType()),
StructField('stats_data2', ArrayType(IntegerType())),
StructField('stats_group', StringType())
])
df_with_xml_stats = original_df.rdd
.map(extract_metrics_from_xml)
.toDF(schema=schema, sampleRatio=1)
.cache()

最新更新