根据值将RDD拆分为多个RDD,而不执行"collect()"和"filter()&quo



我想根据一行中的值将一个RDD拆分为多个RDD。行中的值是已知的,本质上是固定的。

例如

source_rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b',6)])

应该分为两个RDD,其中一个只包含a,另一个只含有b作为密钥

  1. 我尝试过groupByKey方法,并且在对分组RDD执行collect()操作后能够成功执行,但由于内存限制,我在生产中无法执行该操作
a_rdd, b_rdd = source_rdd.keyBy(lambda row: row[0]).groupByKey().collect()
  1. 当前的实现是应用多个筛选器操作来获取每个RDD
a_rdd = source_rdd.filter(lambda row: row[0] == 'a')
b_rdd = source_rdd.filter(lambda row: row[0] == 'b')

这能进一步优化吗?在生产中使用无法放入内存的数据时,最好的方法是什么?

用法:这些RDD将被转换为不同的数据帧(每个键一个(,每个数据帧具有不同的模式,并存储在S3中作为输出。

注意:我更喜欢pyspark实现。我读了很多堆栈溢出的答案和博客,但无论如何都找不到对我有效的答案。

我已经看到了一个标记为重复的问题,我已经在我的问题中提到了这个问题。我问了这个问题,因为提供的解决方案似乎不是最优化的方式,而且已经有3年的历史了。

您也可以使用toDF。Aslo、a_rddb_rdd在您的代码中不是rdd,因为它们是收集的!

df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')