在一个大数据集上一次性读取几次计算的最佳方法是什么



我正在处理一个非常大的数据集,其中包括Spark中的200个压缩JSON文件(每个文件约8G未压缩(。我创建了一个主数据帧largeDF,以及几个额外的数据帧,用于计算嵌套属性(即结构数组(上的聚合。我想执行一个通用的统计计算(填充率和组计数(。

对整个数据集的每次处理大约需要20分钟(加载文件、解压缩和执行聚合(。对于50个字段,这需要很长时间,因为每次我都要更改条件,并使用额外的筛选器一次又一次地运行查询。

我想依靠PySpark的懒惰评估,避免多次加载数据,这样我就可以创建一个复杂的聚合,并在整个数据集上应用一次,然后将所有结果转换为Pandas。或者更好的是,如果我可以预先定义作业,并要求Spark并行处理它们(加载一次,计算全部(,然后分别返回每个作业的结果。

这些不是我的主要ETL,但我正在尝试提取数据集的语义,以编写实际的ETL管道。

计算1:计算统计数据并查找所有字段的填充率:

stats = DF_large.describe().toPandas()

计算2:使用分类数据处理简单字段:

def group_count(df, col, limit, sort, skip_null):
"""This function groups data-set on based on provided column[s], and counts each group."""
if skip_null:
df = df.where(df[col].isNotNull())
if limit:
df = df.limit(limit) 
df = df.groupBy(col).count()
if sort:
df = df.sort(col, ascending=False)
return df.toPandas()
aggregations = {}
for col in group_count_list_of_columns:
aggregations[col] = group_count(largeDF, col, limit=0, skip_null=True, sort=False)

计算3:计数并计算嵌套字段的填充率:

def get_nested_fields(spDf, col : str, limit, othercols : tuple, stats = True):
"""This function unwinds a nested array field out of data-set based on provided column, and either returns the whole or statistics of it."""
spDf = spDf.where(spDf[col].isNotNull())
df = spDf.select(F.explode(col), *othercols)
if limit:
df = df.limit(limit)
if stats:
res = df.describe().toPandas()
else:
res = df.toPandas()
return res
nested_fields_aggregate = {}
for col in nested_fields_lists:
nested_fields_aggregate[col] = get_nested_field(largeDF, col, limit=10**4, othercols =['name', 'id', 'timestamp'], stats = True)

这需要多次读取整个数据集。形状不一样,所以我不能加入。理论上应该有一种减少时间的方法,因为没有一种计算是相互依赖的。

每次调用panda时,都会再次读取DF_large数据帧。为了避免这种情况,您可以使用DF_large = DF_large.cache()缓存此数据帧。

最新更新