很抱歉在最初的问题中出现混淆。以下是一个可复制示例的问题:
我有一个[String]
的rdd和一个[String, Long]
的rdd。我希望基于第二个String
与第一个String
的匹配来获得[Long]
的rdd。示例:
//Create RDD
val textFile = sc.parallelize(Array("Spark can also be used for compute intensive tasks",
"This code estimates pi by throwing darts at a circle"))
// tokenize, result: RDD[(String)]
val words = textFile.flatMap(line => line.split(" "))
// create index of distinct words, result: RDD[(String,Long)]
val indexWords = words.distinct().zipWithIndex()
因此,我希望有一个RDD,其中包含单词索引,而不是"Spark can also be used for compute intensive tasks"
中的单词。
再次抱歉并感谢
如果我理解正确,你会对Spark can also be used for compute intensive tasks
中出现的作品索引感兴趣。
如果是这样的话,这里有两个输出相同但性能特征不同的版本:
val lookupWords: Seq[String] = "Spark can also be used for compute intensive tasks".split(" ")
// option 1 - use join:
val lookupWordsRdd: RDD[(String, String)] = sc.parallelize(lookupWords).keyBy(w => w)
val result1: RDD[Long] = indexWords.join(lookupWordsRdd).map { case (key, (index, _)) => index }
// option 2 - assuming list of lookup words is short, you can use a non-distributed version of it
val result2: RDD[Long] = indexWords.collect { case (key, index) if lookupWords.contains(key) => index }
第一个选项用我们感兴趣的单词创建第二个RDD,使用keyBy
将其转换为PairRDD(key==value!),join
将其与indexWords
RDD一起使用,然后映射以仅获取索引。
第二个选项应该只在已知"感兴趣的单词"列表不太大的情况下使用,因此我们可以将其保留为列表(而不是RDD
),并让Spark将其序列化并发送给每个任务的工作人员。然后,我们使用collect(f: PartialFunction[T, U])
,它应用这个偏函数来同时获得一个"filter"和一个"map"——我们只在单词存在于列表中时返回一个值,如果存在,我们返回索引。
我得到了一个SPARK-5063的错误,给出了这个答案,我找到了问题的解决方案:
//broadcast `indexWords`
val bcIndexWords = sc.broadcast(indexWords.collectAsMap)
// select `value` of `indexWords` given `key`
val result = textFile.map{arr => arr.split(" ").map(elem => bcIndexWords.value(elem))}
result.first()
res373: Array[Long] = Array(3, 7, 14, 6, 17, 15, 0, 12)