我是Pyspark的新手,对如何思考这个问题有点困惑。
我有一个大型数据帧,我想根据两列过滤该数据帧的每个子集,并通过相同的算法运行它。
以下是我现在如何运行它(效率极低(的示例:
for letter in ['a', 'b', 'c']:
for number in [1, 2, 3]
filtered_DF_1, filtered_DF_2 = filter_func(DF_1, DF_2, letter, number)
process_function(filtered_DF_1, filtered_DF_2)
基本过滤功能:
def filter_func(DF_1, DF_2, letter, number):
DF_1 = DF_1.filter(
(F.col("Letter") == letter) &
(F.col('Number') == number)
)
DF_2 = DF_2.filter(
(F.col("Letter") == letter) &
(F.col('Number') == number)
)
return DF_1, DF_2
由于这是 Pyspark,我想并行化它,因为函数的每次迭代都可以独立运行。
我是否需要执行某种映射来获取所有数据子集? 然后我是否需要对process_function
执行任何操作以使其也可用于所有节点以运行并返回答案?
最好的方法是什么? 编辑:
该process_function
获取过滤后的数据集,并通过大约 7 个不同的函数运行它,这些函数已经写在 300 行 pyspark 中 ->最终目标是返回基于一堆复杂逻辑超额预订的时间戳列表。
我认为我的计划是建立一个字母字典 -> [数字],然后分解该列表以获取每个排列并从中创建数据集。然后映射它,希望能够为我的process_function创建一个 udf。
我认为您不需要担心并行化或执行计划,因为火花催化剂会在后台为您完成。最好避免UDF,您可以主要使用inbulit功能来完成。 您是在做转换函数还是在process_func内部进行聚合函数?
请提供任何测试数据和预期输出的适当示例。这将有助于更好地回答。