如何将RDD[(Key, Value)]转换为Map[Key, RDD[Value]]



我搜索了一个解决方案很长时间,但没有得到任何正确的算法。

在scala中使用Spark RDD,我如何将RDD[(Key, Value)]转换为Map[key, RDD[Value]],知道我不能使用收集或其他可能将数据加载到内存中的方法?

事实上,我的最终目标是按键循环Map[Key, RDD[Value]],并为每个RDD[Value]调用saveAsNewAPIHadoopFile

例如,如果我得到:

RDD[("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)]

我想要 :

Map[("A" -> RDD[1, 2, 3]), ("B" -> RDD[4, 5]), ("C" -> RDD[6])]

我想知道在RDD[(Key, Value)]的每个键 A、B、C 上使用 filter 是否会花费太多,但我不知道调用过滤器的时间是否不同,会更有效?(当然不是,但也许使用cache

谢谢

你应该像这样使用代码(Python):

rdd = sc.parallelize( [("A", 1), ("A", 2), ("A", 3), ("B", 4), ("B", 5), ("C", 6)] ).cache()
keys = rdd.keys().distinct().collect()
for key in keys:
    out = rdd.filter(lambda x: x[0] == key).map(lambda (x,y): y)
    out.saveAsNewAPIHadoopFile (...)
一个RDD

不能是另一个RDD的一部分,您没有选择只收集密钥并将其相关值转换为单独的RDD。在我的示例中,您将遍历缓存的RDD,这很好并且工作速度很快

听起来您真正想要的是将KV RDD保存到每个键的单独文件中。与其创建Map[Key, RDD[Value]]不如考虑使用类似于此处示例的MultipleTextOutputFormat。代码几乎全部在示例中。

这种方法的好处是,您可以保证在随机播放后只通过RDD一次,并且可以获得所需的相同结果。如果您按照另一个答案中的建议通过过滤和创建多个 ID 来执行此操作(除非您的来源支持下推过滤器),您最终将为每个单独的键对数据集进行一次传递,这会慢得多。

这是我的简单测试代码。

val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6)))
val groupby_RDD = test_RDD.groupByKey()
val result_RDD = groupby_RDD.map{v => 
    var result_list:List[Int] = Nil
    for (i <- v._2) {
        result_list ::= i
    }
    (v._1, result_list)
}

结果如下

result_RDD.take(3)
>> res86: Array[(String, List[Int])] = Array((A,List(1, 3, 2)), (B,List(5, 4)), (C,List(6)))

或者你可以这样做

val test_RDD = sc.parallelize(List(("A",1),("A",2), ("A",3),("B",4),("B",5),("C",6)))
val nil_list:List[Int] = Nil
val result2 = test_RDD.aggregateByKey(nil_list)(
    (acc, value) => value :: acc,
    (acc1, acc2) => acc1 ::: acc2 )

结果是这样的

result2.take(3)
>> res209: Array[(String, List[Int])] = Array((A,List(3, 2, 1)), (B,List(5, 4)), (C,List(6)))

最新更新