如何用RDD类型[String,Int]的值替换[String]的RDD类型



很抱歉在最初的问题中出现混淆。以下是一个可复制示例的问题:

我有一个[String]的rdd和一个[String, Long]的rdd。我希望基于第二个String与第一个String的匹配来获得[Long]的rdd。示例:

//Create RDD
val textFile = sc.parallelize(Array("Spark can also be used for compute intensive tasks",
      "This code estimates pi by throwing darts at a circle"))
// tokenize, result: RDD[(String)]
val words = textFile.flatMap(line => line.split(" "))
// create index of distinct words, result:  RDD[(String,Long)]
val indexWords = words.distinct().zipWithIndex()

因此,我希望有一个RDD,其中包含单词索引,而不是"Spark can also be used for compute intensive tasks"中的单词。

再次抱歉并感谢

如果我理解正确,你会对Spark can also be used for compute intensive tasks中出现的作品索引感兴趣。

如果是这样的话,这里有两个输出相同但性能特征不同的版本:

val lookupWords: Seq[String] = "Spark can also be used for compute intensive tasks".split(" ")
// option 1 - use join:
val lookupWordsRdd: RDD[(String, String)] = sc.parallelize(lookupWords).keyBy(w => w)
val result1: RDD[Long] = indexWords.join(lookupWordsRdd).map { case (key, (index, _)) => index }
// option 2 - assuming list of lookup words is short, you can use a non-distributed version of it
val result2: RDD[Long] = indexWords.collect { case (key, index) if lookupWords.contains(key) => index }

第一个选项用我们感兴趣的单词创建第二个RDD,使用keyBy将其转换为PairRDD(key==value!),join将其与indexWords RDD一起使用,然后映射以仅获取索引。

第二个选项应该只在已知"感兴趣的单词"列表不太大的情况下使用,因此我们可以将其保留为列表(而不是RDD),并让Spark将其序列化并发送给每个任务的工作人员。然后,我们使用collect(f: PartialFunction[T, U]),它应用这个偏函数来同时获得一个"filter"和一个"map"——我们只在单词存在于列表中时返回一个值,如果存在,我们返回索引。

我得到了一个SPARK-5063的错误,给出了这个答案,我找到了问题的解决方案:

//broadcast `indexWords`
val bcIndexWords = sc.broadcast(indexWords.collectAsMap)
// select `value` of `indexWords` given `key`
val result = textFile.map{arr => arr.split(" ").map(elem => bcIndexWords.value(elem))}
result.first()
res373: Array[Long] = Array(3, 7, 14, 6, 17, 15, 0, 12)

最新更新