(String, String) in Scala Apache Spark



>我有一个文件,其中每行都包含(Stringx, Stringy)

我想查找整个数据集中Stringy的出现次数。到目前为止,我管理的代码如下:

val file = sc.textFile("s3n://bucket/test.txt") // RDD[ String ]
val splitRdd = file.map(line => line.split("t"))    
    // RDD[ Array[ String ]
val yourRdd = splitRdd.flatMap(arr => {
      val title = arr(0)
      val text = arr(1)
      val words = text.split(" ")
      words.map(word => (word, title))
    })
    // RDD[ ( String, String ) ]
scala> val s = yourRdd.map(word => ((word, scala.math.log(N/(file.filter(_.split("t")(1).contains(word.split(",")(1))).count)))))
<console>:31: error: value split is not a member of (String, String)
       val s = yourRdd.map(word => ((word, scala.math.log(N/(file.filter(_.split("t")(1).contains(word.split(",")(1))).count)))))

这里N = 20(它是一个固定值)我应该如何解决这个问题?

更新

实施布伦登的评论

scala> val s = yourRdd.map(word => (word, scala.math.log(N / file.filter(_.split("t")(1).contains(word._1.split(",")(1))).count)))
s: org.apache.spark.rdd.RDD[((String, String), Double)] = MapPartitionsRDD[18] at map at <console>:33
scala> s.first()
15/04/23 15:43:44 INFO SparkContext: Starting job: first at <console>:36
15/04/23 15:43:44 INFO DAGScheduler: Got job 16 (first at <console>:36) with 1 output partitions (allowLocal=true)
15/04/23 15:43:44 INFO DAGScheduler: Final stage: Stage 17(first at <console>:36)
15/04/23 15:43:44 INFO DAGScheduler: Parents of final stage: List()
15/04/23 15:43:44 INFO DAGScheduler: Missing parents: List()
15/04/23 15:43:44 INFO DAGScheduler: Submitting Stage 17 (MapPartitionsRDD[18] at map at <console>:33), which has no missing parents
15/04/23 15:43:44 INFO MemoryStore: ensureFreeSpace(11480) called with curMem=234927, maxMem=277842493
15/04/23 15:43:44 INFO MemoryStore: Block broadcast_18 stored as values in memory (estimated size 11.2 KB, free 264.7 MB)
15/04/23 15:43:44 INFO MemoryStore: ensureFreeSpace(5713) called with curMem=246407, maxMem=277842493
15/04/23 15:43:44 INFO MemoryStore: Block broadcast_18_piece0 stored as bytes in memory (estimated size 5.6 KB, free 264.7 MB)
15/04/23 15:43:44 INFO BlockManagerInfo: Added broadcast_18_piece0 in memory on localhost:59043 (size: 5.6 KB, free: 264.9 MB)
15/04/23 15:43:44 INFO BlockManagerMaster: Updated info of block broadcast_18_piece0
15/04/23 15:43:44 INFO SparkContext: Created broadcast 18 from broadcast at DAGScheduler.scala:839
15/04/23 15:43:44 INFO DAGScheduler: Submitting 1 missing tasks from Stage 17 (MapPartitionsRDD[18] at map at <console>:33)
15/04/23 15:43:44 INFO TaskSchedulerImpl: Adding task set 17.0 with 1 tasks
15/04/23 15:43:44 INFO TaskSetManager: Starting task 0.0 in stage 17.0 (TID 22, localhost, PROCESS_LOCAL, 1301 bytes)
15/04/23 15:43:44 INFO Executor: Running task 0.0 in stage 17.0 (TID 22)
15/04/23 15:43:44 INFO HadoopRDD: Input split: file:/home/ec2-user/input/OUTPUT/temp:0+128629
15/04/23 15:43:44 ERROR Executor: Exception in task 0.0 in stage 17.0 (TID 22)
org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

它正在单词中查找值"split"(这将是"def split"成员)。但是,"word"不是字符串,它是一个(字符串,字符串),元组没有拆分方法。我相信你的意思是word._1.split(",")(0),命令变成:

val s = yourRdd.map(word => (word, scala.math.log(N / file.filter(_.split("t")(1).contains(word._1.split(",")(1))).count)))

编辑::

随着 maasg 对真正潜在问题的回答提供的清晰度,我发现我需要计算每个标题单词的唯一实例。我会对 maasg 的答案投赞成票,但我没有足够的代表:(

val sc = SparkApplicationContext.coreCtx
val N = 20
var rdd: RDD[String] = sc.parallelize(Seq("t1thi how how you,you", "t1tcat dog,cat,mouse how you,you"))
val splitRdd: RDD[Array[String]] = rdd.map(line => line.split("t"))
//Uniqe words per title and then reduced by title into a count
val wordCountRdd = splitRdd.flatMap(arr =>
  arr(1).split(" |,").distinct //Including a comma cause you seem to split on this later on to, but I don't think you actually need too
    .map(word => (word, 1))
).reduceByKey{case (cumm, one) => cumm + one}
val s: RDD[(String, Double)] = wordCountRdd.map{ case (word, freq) => (word, scala.math.log(N / freq)) }
s.collect().map(x => println(x._1 + ", " + x._2))

正如评论中提到的,在另一个RDD上运行的闭包中不可能使用"嵌套"的RDD。这要求改变战略。假设每个标题都是唯一的,并尝试在与原始问题相同的行中工作,这可能是消除对嵌套RDD计算需求的替代方法:

val file = sc.textFile("s3n://bucket/test.txt") // RDD[ String ]
val wordByTitle = file.flatMap{line => 
    val split = line.split("t")
    val title = split(0)
    val words = split(1).split(" ")
    words.map(w=> (w,title))
}
// we want the count of documents in which a word appears, 
// this is equivalent to counting distinct (word, title) combinations.
// note that replacing the title by a hash would save considerable memory
val uniqueWordPerTitle = wordByTitle.distinct()
// now we can calculate the word frequency acros documents
val tdf = uniqueWordPerTitle.map{case (w, title) => (w,1)}.reduceByKey(_ + _)
// and the inverse document frequency per word.
val idf = tdf.map{case (word,freq) => (word, scala.math.log(N/freq))}

相关内容

最新更新