我是Spark和Scala的新手,这就是为什么我很难度过这段时间。
我打算做的是使用Spark与Stanford CoreNLP预处理我的数据。我知道我必须使用mapPartitions
,以便按照本线程中的建议,每个分区有一个StanfordCoreNLP
实例。然而,我不知道/不了解如何从这里开始。
最后,我想在这些数据上训练单词向量,但现在我很乐意了解如何从这里获得处理后的数据,并将其写入另一个文件。
这就是我目前所得到的:
import java.util.Properties
import com.google.gson.Gson
import edu.stanford.nlp.ling.CoreAnnotations.{LemmaAnnotation, SentencesAnnotation, TokensAnnotation}
import edu.stanford.nlp.pipeline.{Annotation, StanfordCoreNLP}
import edu.stanford.nlp.util.CoreMap
import masterthesis.code.wordvectors.Review
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.JavaConversions._
object ReviewPreprocessing {
def main(args: Array[String]) {
val resourceUrl = getClass.getResource("amazon-reviews/reviews_Electronics.json")
val file = sc.textFile(resourceUrl.getPath)
val linesPerPartition = file.mapPartitions( lineIterator => {
val props = new Properties()
props.put("annotators", "tokenize, ssplit, pos, lemma")
val sentencesAsTextList : List[String] = List()
val pipeline = new StanfordCoreNLP(props)
val gson = new Gson()
while(lineIterator.hasNext) {
val line = lineIterator.next
val review = gson.fromJson(line, classOf[Review])
val doc = new Annotation(review.getReviewText)
pipeline.annotate(doc)
val sentences : java.util.List[CoreMap] = doc.get(classOf[SentencesAnnotation])
val sb = new StringBuilder();
sentences.foreach( sentence => {
val tokens = sentence.get(classOf[TokensAnnotation])
tokens.foreach( token => {
sb.append(token.get(classOf[LemmaAnnotation]))
sb.append(" ")
})
})
sb.setLength(sb.length - 1)
sentencesAsTextList.add(sb.toString)
}
sentencesAsTextList.iterator
})
System.exit(0)
}
}
例如,我将如何将这个结果写入一个文件?订购在这里并不重要——我想订购在这一点上已经丢失了。
如果您在RDD
上使用saveAsTextFile
,那么您最终会拥有与您拥有的分区一样多的输出文件。为了只有一个,你可以将所有东西合并到一个分区中,比如
sc.textFile("/path/to/file")
.mapPartitions(someFunc())
.coalesce(1)
.saveAsTextFile("/path/to/another/file")
或者(只是为了好玩)你可以把所有的分区一个接一个地放到驱动程序中,然后自己保存所有的数据。
val it = sc.textFile("/path/to/file")
.mapPartitions(someFunc())
.toLocalIterator
while(it.hasNext) {
writeToFile(it.next())
}