对于spark来说,什么是最快的数据结构?元组的映射或列表



我的数据可以以两种形式存在,

RDD[Map[String, Map[String, Int]]] 

RDD[List[(String, List[(String,Int)])]]

可以看到,在第二个示例中,数据的"映射"是通过元组的第一个元素作为键来实现的。考虑rdd中的两个元素,分别叫它们R1和R2。我将在R1和R2中按键合并。当R1和R2都包含相同的键时,我对这些值进行进一步合并。例如,假设R1和R2都包含一个条目

outer_key1 -> (inner_key1 -> 1)

则合并结果将生成

outer_key1 -> (inner_key1 -> 2)

所以,我的问题是哪种数据结构更快,内存效率更高,spark通过外部和内部键来减少?(key, list_of_tuple)的映射或列表的映射。我的直觉是,考虑到它们的0(1)查找,映射在按键减少方面会更快。然而,考虑到大多数地图的实现方式,我确信基于地图的RDDS会浪费相当多的内存。

作为这种类型合并的实际示例,我的rdd表示

Map(email_address->(date->number_of_emails_recieved_that_day)) where each RDD contains many email addresses

我认为您误解了RDD的概念。您需要将您的数据转换为适当的结构,以利用RDD的功能。

所以,你需要考虑你想要计算什么来决定你的RDD。

根据我对你问题的理解。你有两个数据源你想要合并这两个数据源的数据。所以你从这些来源创建你的2个RDD,然后你合并它们。
// First we will have to create RDD's from our data source.    
// create RDD from source 1
// Lets say you have a List[(String, List[(String, Int)]]
val src1 = List(
  ("email1@example.com", List(("01/01/2016", 10), ("05/01/2016", 20)))
  ("email2@example.com", List(("01/01/2016", 5), ("06/01/2016", 30))
)
// Now enters spark
val rddSrc1: RDD[(String, List[String, Int])] = sc.parallelize(src1)

// create RDD from source 2
// Lets say you have a Map[(String, Map[String, Int]]
val src2 = Map(
  "email1@example.com" -> Map("01/01/2016" -> 10, "05/01/2016" -> 20)
  "email2@example.com" -> Map("01/01/2016" -> 5, "06/01/2016" -> 30)
)
// Now enters spark
val rddSrc1: RDD[(String, Map[String, Int])] = sc.parallelize(src2.toList)

// Now since you want to merge on both "email" and "date" lets make ("email", "date") tuple as key.
rddSrc1T: RDD[(String, String), Int] = rddSrc1
  .flatMap({
     case (email, list) => list.map({
       case (date, number) => ((email, date), number)
     })
   })
rddSrc2T: RDD[(String, String), Int] = rddSrc1
  .flatMap({
     case (email, map) => map.toList.map({
       case (date, number) => ((email, date), number)
     })
   })
// now co-group the 2 RDD's
rddCogroup: RDD[((String, String), Iterable[Int], Iterable[Int])) = rddSrc1T.cogroup(rddSrc2T)
val totalNumberRdd: RDD[((String, String), Int] = rddCogroup.map({
  case ((email, date), iter1, iter2) => ((email, date), iter1.sum + iter2.sum)
})

相关内容

  • 没有找到相关文章

最新更新