安装管道并处理数据



我有一个包含文本的文件。我要做的是使用管道来使文本施加,删除止动词并产生2克。

我到目前为止所做的:

步骤1:读取文件

val data = sparkSession.read.text("data.txt").toDF("text")

步骤2:构建管道

val pipe1 = new Tokenizer().setInputCol("text").setOutputCol("words")
val pipe2 = new StopWordsRemover().setInputCol("words").setOutputCol("filtered")
val pipe3 = new NGram().setN(2).setInputCol("filtered").setOutputCol("ngrams")
val pipeline = new Pipeline().setStages(Array(pipe1, pipe2, pipe3))
val model = pipeline.fit(data)

我知道pipeline.fit(data)会产生PipelineModel,但是我不知道如何使用PipelineModel

任何帮助都将不胜感激。

运行val model = pipeline.fit(data)代码时,所有Estimator阶段(即:机器学习任务,例如分类,回归,聚类等)都适合数据,并且创建了Transformer阶段。您只有Transformer阶段,因为您在此管道中进行功能创建。

为了执行您的模型,现在仅由Transformer阶段组成,您需要运行val results = model.transform(data)。这将对您的数据框架执行每个Transformer阶段。因此,在model.transform(data)进程的末尾,您将拥有一个由原始行,令牌输出,stopwordsremover输出和最后的NGRAM结果组成的数据框架。

可以通过SparkSQL查询执行功能创建完成后的前5个NGRAM。首先爆炸ngram列,然后计数groupby ngrams,以下降的方式按计数列进行订购,然后执行show(5)。另外,您可以使用"LIMIT 5方法而不是show(5)

顺便说一句,您可能应该将对象名称更改为不是标准类名称的东西。否则,您将遇到模棱两可的范围错误。

代码:

import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.Tokenizer
import org.apache.spark.sql.SparkSession._
import org.apache.spark.sql.functions._
import org.apache.spark.ml.feature.NGram
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.{Pipeline, PipelineModel}
object NGramPipeline {
    def main() {
        val sparkSession = SparkSession.builder.appName("NGram Pipeline").getOrCreate()
        val sc = sparkSession.sparkContext
        val data = sparkSession.read.text("quangle.txt").toDF("text")
        val pipe1 = new Tokenizer().setInputCol("text").setOutputCol("words")
        val pipe2 = new StopWordsRemover().setInputCol("words").setOutputCol("filtered")
        val pipe3 = new NGram().setN(2).setInputCol("filtered").setOutputCol("ngrams")
        val pipeline = new Pipeline().setStages(Array(pipe1, pipe2, pipe3))
        val model = pipeline.fit(data)
        val results = model.transform(data)
        val explodedNGrams = results.withColumn("explNGrams", explode($"ngrams"))
        explodedNGrams.groupBy("explNGrams").agg(count("*") as "ngramCount").orderBy(desc("ngramCount")).show(10,false)
    }
}
NGramPipeline.main()



输出:

+-----------------+----------+
|explNGrams       |ngramCount|
+-----------------+----------+
|quangle wangle   |9         |
|wangle quee.     |4         |
|'mr. quangle     |3         |
|said, --         |2         |
|wangle said      |2         |
|crumpetty tree   |2         |
|crumpetty tree,  |2         |
|quangle wangle,  |2         |
|crumpetty tree,--|2         |
|blue babboon,    |2         |
+-----------------+----------+
only showing top 10 rows

请注意,有语法(逗号,破折号等)导致线路重复。执行Ngrams时,通常是一个好主意。您通常可以使用正则罚款。

最新更新