PairRDD,每个键初始化一次变量



我有一个大RDD[(K, V)]。为了映射值,我需要每个键通用的大型数据结构,并且构建成本很高。我无法执行groupByKey并在之后执行flatMap,因为每个键的值都不适合内存。我无法加载所有结构,因为它们也不适合内存。如何为每个组执行一次结构初始化(或最少次数),然后将其删除?

用例

  • 我们有一个RDD[String,String]。键指示值的语言,该值是该语言的短文本。
  • 我们想对值的一些标记进行分类。为此,我们需要为每种语言构建一个trie,其中包含一些令牌的类别。
  • 构建trie是昂贵的,所以我们不能为每个(K,V)对构建它。单个尝试将适合内存,但保留所有语言的尝试不会(考虑到不同键的数量)。
  • 因此,我们需要一种方法来构建最少的次数,并且只在内存中保留其中的几个次数。

例如,您可以将RDDrepartitionAndSortWithinPartitions一起使用,后跟mapPartitions

val partitioner: org.apache.spark.Partitioner = ???
rdd.repartitionAndSortWithinPartition(partitioner).mapPartitions { iter => {
var currentKey: Option[String] = None
var currentTrie: Option[Trie] = None 
iter.map {
case (k, v) => 
.. // if Option(k) != currentKey update currentKey and currentTrie
.. // Proceed with logic
}
}}

Dataset,后跟groupBy后跟flatMapGroups

rdd.toDS.groupByKey(_._1).flatMapGroups { case (key, iter) => {
val currentTrie: Trie = ???
iter.map { case (_, v) => ??? }
})

RDD对应的不同Dataset不必一次将所有值加载到内存中,因此各个组的大小应该不是问题。

这两种解决方案都需要完全随机播放,但每个引用结构将只为每个键初始化一次。

根据您的用例,您可以为此大型数据结构使用广播变量

val broadcastVar = sc.broadcast(LargeThingy())
broadcastVar.value

http://spark.apache.org/docs/latest/programming-guide.html#broadcast-variables

或者你可以使用rdd.foreachPartition,为每个分区初始化一个大东西,然后处理分区中的数据:

rdd.foreachPartition { case (data) =>
val largeThing = LargeThing()
data.foreach { //etc. }
}

最新更新