我的数据可以以两种形式存在,
RDD[Map[String, Map[String, Int]]]
或
RDD[List[(String, List[(String,Int)])]]
可以看到,在第二个示例中,数据的"映射"是通过元组的第一个元素作为键来实现的。考虑rdd中的两个元素,分别叫它们R1和R2。我将在R1和R2中按键合并。当R1和R2都包含相同的键时,我对这些值进行进一步合并。例如,假设R1和R2都包含一个条目
outer_key1 -> (inner_key1 -> 1)
则合并结果将生成
outer_key1 -> (inner_key1 -> 2)
所以,我的问题是哪种数据结构更快,内存效率更高,spark通过外部和内部键来减少?(key, list_of_tuple)的映射或列表的映射。我的直觉是,考虑到它们的0(1)查找,映射在按键减少方面会更快。然而,考虑到大多数地图的实现方式,我确信基于地图的RDDS会浪费相当多的内存。
作为这种类型合并的实际示例,我的rdd表示
Map(email_address->(date->number_of_emails_recieved_that_day)) where each RDD contains many email addresses
我认为您误解了RDD的概念。您需要将您的数据转换为适当的结构,以利用RDD的功能。
所以,你需要考虑你想要计算什么来决定你的RDD。
根据我对你问题的理解。你有两个数据源你想要合并这两个数据源的数据。所以你从这些来源创建你的2个RDD,然后你合并它们。// First we will have to create RDD's from our data source.
// create RDD from source 1
// Lets say you have a List[(String, List[(String, Int)]]
val src1 = List(
("email1@example.com", List(("01/01/2016", 10), ("05/01/2016", 20)))
("email2@example.com", List(("01/01/2016", 5), ("06/01/2016", 30))
)
// Now enters spark
val rddSrc1: RDD[(String, List[String, Int])] = sc.parallelize(src1)
// create RDD from source 2
// Lets say you have a Map[(String, Map[String, Int]]
val src2 = Map(
"email1@example.com" -> Map("01/01/2016" -> 10, "05/01/2016" -> 20)
"email2@example.com" -> Map("01/01/2016" -> 5, "06/01/2016" -> 30)
)
// Now enters spark
val rddSrc1: RDD[(String, Map[String, Int])] = sc.parallelize(src2.toList)
// Now since you want to merge on both "email" and "date" lets make ("email", "date") tuple as key.
rddSrc1T: RDD[(String, String), Int] = rddSrc1
.flatMap({
case (email, list) => list.map({
case (date, number) => ((email, date), number)
})
})
rddSrc2T: RDD[(String, String), Int] = rddSrc1
.flatMap({
case (email, map) => map.toList.map({
case (date, number) => ((email, date), number)
})
})
// now co-group the 2 RDD's
rddCogroup: RDD[((String, String), Iterable[Int], Iterable[Int])) = rddSrc1T.cogroup(rddSrc2T)
val totalNumberRdd: RDD[((String, String), Int] = rddCogroup.map({
case ((email, date), iter1, iter2) => ((email, date), iter1.sum + iter2.sum)
})