我想根据一行中的值将一个RDD拆分为多个RDD。行中的值是已知的,本质上是固定的。
例如
source_rdd = sc.parallelize([('a',1),('a',2),('a',3),('b',4),('b',5),('b',6)])
应该分为两个RDD,其中一个只包含a
,另一个只含有b
作为密钥
- 我尝试过
groupByKey
方法,并且在对分组RDD执行collect()
操作后能够成功执行,但由于内存限制,我在生产中无法执行该操作
a_rdd, b_rdd = source_rdd.keyBy(lambda row: row[0]).groupByKey().collect()
- 当前的实现是应用多个筛选器操作来获取每个RDD
a_rdd = source_rdd.filter(lambda row: row[0] == 'a')
b_rdd = source_rdd.filter(lambda row: row[0] == 'b')
这能进一步优化吗?在生产中使用无法放入内存的数据时,最好的方法是什么?
用法:这些RDD将被转换为不同的数据帧(每个键一个(,每个数据帧具有不同的模式,并存储在S3中作为输出。
注意:我更喜欢pyspark
实现。我读了很多堆栈溢出的答案和博客,但无论如何都找不到对我有效的答案。
我已经看到了一个标记为重复的问题,我已经在我的问题中提到了这个问题。我问了这个问题,因为提供的解决方案似乎不是最优化的方式,而且已经有3年的历史了。
您也可以使用toDF
。Aslo、a_rdd
和b_rdd
在您的代码中不是rdd
,因为它们是收集的!
df = source_rdd.keyBy(lambda row: row[0]).groupByKey()
a_rdd = df.filter(lambda row: row[0] == 'a')
b_rdd = df.filter(lambda row: row[0] == 'b')