Spark DataFrame每个文档的字数,每个文档的单行



我使用的是Spark 1.5.2和Java API。有没有办法创建一个包含单词的DataFrame包含所有单词的每个文档的计数以及每个文档单行的计数?

到目前为止,我已经能够使用"org.apache.spark.sql.functions.bloop"来转换每个单词在文档文本中插入新行。

然后,我可以使用以下代码创建一个新的DataFrame,其中包含多行中的每个文档、单词和字数:

df = df.orderBy("doc_id").groupBy(df.col("doc_id"), df.col("word")).count(); 

输出:

+------+-----------+-----+
|doc_id|       word|count|
+------+-----------+-----+
|doc_1 |       game|    2|
|doc_1 |       life|    1|
|doc_1 |everlasting|    1|
|doc_1 |      learn|    1|
|doc_2 |    special|    1|
|doc_2 |     moment|    1|
|doc_2 |       time|    1|
|doc_3 | unexamined|    1|
|doc_3 |       life|    1|
|doc_3 |      worth|    1|
|doc_3 |       live|    1|
+------+-----------+-----+

如何创建以下格式的DataFrame:

 +------+-----------+---------------------------------+
 |doc_id|      word_counts|
 +------+-----------+------------------------------+
 |doc_1 |{game=1, learn=2, everlating=1, life=1}
 |doc_2 |{special=1, moment=2, everlating=1, time=1}

谢谢。任何想法都非常感谢

我一开始就不会使用explode。如果从每行文档开始,则可以直接使用计算计数,例如使用ML转换器。一个非常简单的例子如下:

import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.ml.feature.CountVectorizer
val df = sc.parallelize(Seq(
  ("doc_1", "game game life everlasting learn"),
  ("doc_2", "special moment time unexamined"),
  ("doc_3", "life worth live")
)).toDF("doc_id", "text")
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val tokenized = tokenizer.transform(df)
val cvModel = new CountVectorizer()
  .setInputCol("words")
  .setOutputCol("features")
  .fit(tokenized)
val counted = cvModel.transform(tokenized)

此时,您已经对每个文档进行了计数。在每行中显式地保留令牌是相当浪费的,但可以使用小型UDF:来完成

import org.apache.spark.mllib.linalg.{SparseVector, Vector} 
def vectorsToMaps(vocabulary: Array[String]) = {
  udf((v: Vector) => {
    val sv = v.toSparse
    sv.indices.map(i => (vocabulary(i) -> sv(i))).toMap
  })
}
counted.select(vectorsToMaps(cvModel.vocabulary)($"features")
  .alias("freqs"))
  .show(3, false)
// +------------------------------------------------------------------+
// |freqs                                                             |
// +------------------------------------------------------------------+
// |Map(game -> 2.0, life -> 1.0, learn -> 1.0, everlasting -> 1.0)   |
// |Map(moment -> 1.0, special -> 1.0, unexamined -> 1.0, time -> 1.0)|
// |Map(life -> 1.0, live -> 1.0, worth -> 1.0)                       |
// +------------------------------------------------------------------+

您可以下拉到RDD并使用aggregateByKey:

df.rdd
  .aggregateByKey(Map[String,Int]())
  (
    (wordMap, word) => wordMap + (word -> (1 + wordMap.getOrElse(word, 0))), 
    (wordMap1, wordMap2) => wordMap1 ++ wordMap2.map{ case(k,v) => (k -> (v + wordMap1.getOrElse(k,0))) }
  )

相关内容

  • 没有找到相关文章

最新更新