Pyspark:根据列过滤 DF,然后通过函数运行每个子集 DF



我是Pyspark的新手,对如何思考这个问题有点困惑。

我有一个大型数据帧,我想根据两列过滤该数据帧的每个子集,并通过相同的算法运行它。

以下是我现在如何运行它(效率极低(的示例:

for letter in ['a', 'b', 'c']:
for number in [1, 2, 3]
filtered_DF_1, filtered_DF_2 = filter_func(DF_1, DF_2, letter, number)
process_function(filtered_DF_1, filtered_DF_2)

基本过滤功能:

def filter_func(DF_1, DF_2, letter, number):
DF_1 = DF_1.filter(
(F.col("Letter") == letter) &                                
(F.col('Number') == number)                          
)
DF_2 = DF_2.filter(
(F.col("Letter") == letter) &                                
(F.col('Number') == number)                         
)
return DF_1, DF_2

由于这是 Pyspark,我想并行化它,因为函数的每次迭代都可以独立运行。

我是否需要执行某种映射来获取所有数据子集? 然后我是否需要对process_function执行任何操作以使其也可用于所有节点以运行并返回答案?

最好的方法是什么? ​ 编辑:

process_function获取过滤后的数据集,并通过大约 7 个不同的函数运行它,这些函数已经写在 300 行 pyspark 中 ->最终目标是返回基于一堆复杂逻辑超额预订的时间戳列表。

我认为我的计划是建立一个字母字典 -> [数字],然后分解该列表以获取每个排列并从中创建数据集。然后映射它,希望能够为我的process_function创建一个 udf。

我认为您不需要担心并行化或执行计划,因为火花催化剂会在后台为您完成。最好避免UDF,您可以主要使用inbulit功能来完成。 您是在做转换函数还是在process_func内部进行聚合函数?

请提供任何测试数据和预期输出的适当示例。这将有助于更好地回答。

最新更新