缓存 Spark 数据帧以增强速度



>我有一个函数,它将数据帧列表连接到基本数据帧并返回数据帧。我正在尝试减少此操作所需的时间。由于我使用基本数据帧多次联接,因此我缓存了它,但运行时仍然相似。这是我正在使用的功能

def merge_dataframes(base_df, df_list, id_col):
"""
Joins multiple dataframes using an identifier variable common across datasets
:param base_df: everything will be added to this dataframe
:param df_list: dfs that have to be joined to main dataset
:param id_col: the identifier column
:return: dataset with all joins
"""
base_df.persist(StorageLevel.MEMORY_AND_DISK)
for each_df in df_list:
base_df = base_df.join(each_df, id_col)
base_df.unpersist()
return base_df

我很惊讶在缓存后得到类似的结果。这背后的原因是什么,我能做些什么来减少这消耗更少的时间。

另外,由于我目前使用的数据集相对较小(~50k 条记录(,因此只要我取消缓存数据集,我就没有在需要时缓存数据集的问题。

Join 是一种转换 - 此时不会触发任何计算

第一:

您在操作之前unpersist()它。

尝试删除unpersist,看看会发生什么。

第二:

恐怕在您的情况下,您无法从持久性中受益,因为代码中编写的内容与以下内容相同:

base_df.join(df1, id_col).join(df2, id_col).join(df3, id_col)...

在这种情况下,base_df只计算一次,以后只进一步使用base_df.join()的结果。这意味着base_df不会被重复使用。

下面是可以重用的示例:

base_df.join(df1, id_col)
base_df.join(df2, id_col)

但这不符合您的要求。 根据base_dflist_df的创建方式(它们的创建方式(,您可能需要考虑使用相同的分区程序对这些数据帧进行预分区 - 在这种情况下join操作不会导致随机播放,这将大大提高性能。

另一种方法是broadcast join如果list_dfs中的数据帧相对较小。

最新更新