好吧,大师,我遇到了一个对我来说没有多大意义的问题。我一直在尝试将一个对象保存到mongodb,它看起来像(大致)
{data:[baseball:[{timestamp (essentially):tweet},{timestamp: another tweet}]
football:[{timestamp:footballtweet},{timestamp:differentfootballtweet}]
]
timeInterval:"last minute to this minute" ( i'm doing timeseries data)
terms:["football","baseball"]
}
请参阅下面我坚持的循环。注意,这个问题可能与rrd即将到期。我试图通过将它保存在内存中来修复它,但我不知道该怎么办。
twitterStream.foreachRDD(rrd => {
val entryToSave = MongoDBObject()
val termWithTweets = MongoDBObject()
rrd.persist()
filters.foreach(term =>{
var listOfTweets = MongoDBObject()
rrd.persist()
for(status <- rrd){
if(status.getText.contains(term)) {
// listOfTweets += status
//Why doesnt this line below actually add the key value pair to the variable
//defined outside of the "for(status <- rrd)" loop? I know ( through debugging)
//that it does in fact append inside the loop.
listOfTweets += (DateTime.now.toString() -> status.toString)
}
}
//when I print the listOfTweets outside of the for loop it is empty, Why?
println("outsideRRD",listOfTweets)
termWithTweets += (term -> listOfTweets)
})
entryToSave += ("data" -> termWithTweets)
entryToSave += ("timeInterval" -> (DateTime.lastMinute to DateTime.now).toString)
entryToSave += ("terms" -> filters)
collection.insert(entryToSave)
})
我不认为这是一个val/var问题,尽管它可能是。我已经尝试过了双向
RDD上的计算分布在集群上。不能从RDD中更新在RDD操作闭包之外创建的变量。它们基本上位于两个不同的位置:变量在Spark驱动程序中创建,并在worker中访问,应被视为只读。
Spark支持在这种情况下可以使用的分布式累积器:火花累积器
另一种选择(我更喜欢)是将RDD流转换为所需的数据格式,并使用foreachRDD
方法将其持久化到辅助存储中。这将是一种更实用的解决问题的方法。大致如下:
val filteredStream = twitterStream.filter(entry => filters.exists(term => entry.getText.getStatus.contains(term)))
val filteredStreamWithTs = filteredStream.map(x => ((DateTime.now.toString(), x)))
filteredStreamWithTs.foreachRdd(rdd => // write to Mongo)