我搜索了一个解决方案很长时间,但没有得到任何正确的算法。
在scala中使用Spark RDD,我如何将RDD[(Key, Value)]
转换为Map[key, RDD[Value]]
,知道我不能使用收集或其他可能将数据加载到内存中的方法?
事实上,我的最终目标是按键循环Map[Key, RDD[Value]]
,并为每个RDD[Value]
调用saveAsNewAPIHadoopFile
例如,如果我得到:
RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]
我想要 :
Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])]
我想知道在RDD[(Key, Value)]
的每个键 A、B、C 上使用 filter
是否会花费太多,但我不知道调用过滤器的时间是否不同,会更有效?(当然不是,但也许使用cache
?
谢谢
你应该像这样使用代码(Python):
rdd = sc.parallelize( [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] ).cache()
keys = rdd.keys().distinct().collect()
for key in keys:
out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y)
out.saveAsNewAPIHadoopFile (...)
一个RDD不能是另一个RDD的一部分,您没有选择只收集密钥并将其相关值转换为单独的RDD。在我的示例中,您将遍历缓存的RDD,这很好并且工作速度很快
听起来您真正想要的是将KV RDD保存到每个键的单独文件中。与其创建Map[Key, RDD[Value]]
不如考虑使用类似于此处示例的MultipleTextOutputFormat
。代码几乎全部在示例中。
这种方法的好处是,您可以保证在随机播放后只通过RDD一次,并且可以获得所需的相同结果。如果您按照另一个答案中的建议通过过滤和创建多个 ID 来执行此操作(除非您的来源支持下推过滤器),您最终将为每个单独的键对数据集进行一次传递,这会慢得多。
这是我的简单测试代码。
val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6)))
val groupby_RDD = test_RDD.groupByKey()
val result_RDD = groupby_RDD.map{v =>
var result_list:List[Int] = Nil
for (i <- v._2) {
result_list ::= i
}
(v._1, result_list)
}
结果如下
result_RDD.take(3)
>> res86: Array[(String, List[Int])] = Array((A,List(1, 3, 2)), (B,List(5, 4)), (C,List(6)))
或者你可以这样做
val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6)))
val nil_list:List[Int] = Nil
val result2 = test_RDD.aggregateByKey(nil_list)(
(acc, value) => value :: acc,
(acc1, acc2) => acc1 ::: acc2 )
结果是这样的
result2.take(3)
>> res209: Array[(String, List[Int])] = Array((A,List(3, 2, 1)), (B,List(5, 4)), (C,List(6)))