MongoDB对象没有添加到rrd-foreach循环casbah-scala-apache-spark的内部



好吧,大师,我遇到了一个对我来说没有多大意义的问题。我一直在尝试将一个对象保存到mongodb,它看起来像(大致)

{data:[baseball:[{timestamp (essentially):tweet},{timestamp: another tweet}]
       football:[{timestamp:footballtweet},{timestamp:differentfootballtweet}]
      ]
 timeInterval:"last minute to this minute" ( i'm doing timeseries data)
 terms:["football","baseball"]
}

请参阅下面我坚持的循环。注意,这个问题可能与rrd即将到期。我试图通过将它保存在内存中来修复它,但我不知道该怎么办。

   twitterStream.foreachRDD(rrd => {
          val entryToSave = MongoDBObject()
          val termWithTweets = MongoDBObject()
          rrd.persist()
          filters.foreach(term =>{
          var listOfTweets = MongoDBObject()
            rrd.persist()
            for(status <- rrd){
              if(status.getText.contains(term)) {
    //            listOfTweets += status
//Why doesnt this line below actually add the key value pair to the variable
//defined outside of the "for(status <- rrd)" loop? I know ( through debugging)
//that it does in fact append inside the loop.
                listOfTweets += (DateTime.now.toString() -> status.toString)
              }
            }
//when I print the listOfTweets outside of the for loop it is empty, Why?
            println("outsideRRD",listOfTweets)
              termWithTweets += (term -> listOfTweets)
          })
          entryToSave += ("data" -> termWithTweets)
          entryToSave += ("timeInterval" -> (DateTime.lastMinute to DateTime.now).toString)
          entryToSave += ("terms" -> filters)
          collection.insert(entryToSave)
        })

我不认为这是一个val/var问题,尽管它可能是。我已经尝试过了双向

RDD上的计算分布在集群上。不能从RDD中更新在RDD操作闭包之外创建的变量。它们基本上位于两个不同的位置:变量在Spark驱动程序中创建,并在worker中访问,应被视为只读。

Spark支持在这种情况下可以使用的分布式累积器:火花累积器

另一种选择(我更喜欢)是将RDD流转换为所需的数据格式,并使用foreachRDD方法将其持久化到辅助存储中。这将是一种更实用的解决问题的方法。大致如下:

  val filteredStream = twitterStream.filter(entry => filters.exists(term => entry.getText.getStatus.contains(term)))
  val filteredStreamWithTs = filteredStream.map(x => ((DateTime.now.toString(), x)))
  filteredStreamWithTs.foreachRdd(rdd => // write to Mongo)

最新更新