计算PySpark中bigram的相对频率



我正在尝试计算文本文件中的单词对。首先,我对文本进行了一些预处理,然后计算单词对,如下所示:

((Aspire, to), 1) ; ((to, inspire), 4) ; ((inspire, before), 38)...

现在,我想报告1000个最频繁的配对,按排序

  1. 单词(两个单词中的第二个(
  2. 相对频率(成对出现次数/第二个单词总出现次数(

以下是我迄今为止所做的

from pyspark.sql import SparkSession
import re
spark = SparkSession.builder.appName("Bigram occurences and relative frequencies").master("local[*]").getOrCreate()
sc = spark.sparkContext
text = sc.textFile("big.txt")
tokens = text.map(lambda x: x.lower()).map(lambda x: re.split("[s,.;:!?]+", x))
pairs = tokens.flatMap(lambda xs: (tuple(x) for x in zip(xs, xs[1:]))).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
frame = pairs.toDF(['pair', 'count'])
# Dataframe ordered by the most frequent pair to the least
most_frequent = frame.sort(frame['count'].desc())
# For each row, trying to add a column with the relative frequency, but I'm getting an error
with_rf = frame.withColumn("rf", frame['count'] / (frame.pair._2.sum()))

我想我已经比较接近我想要的结果了,但我想不通。我是Spark和DataFrames的新手。我也试过

import pyspark.sql.functions as F
frame.groupBy(frame['pair._2']).agg((F.col('count') / F.sum('count')).alias('rf')).show()

如有任何帮助,我们将不胜感激。

EDIT:这是frame数据帧的示例

+--------------------+-----+
|                pair|count|
+--------------------+-----+
|{project, gutenberg}|   69|
|  {gutenberg, ebook}|   14|
|         {ebook, of}|    5|
|    {adventures, of}|    6|
|           {by, sir}|   12|
|     {conan, doyle)}|    1|
|     {changing, all}|    2|
|         {all, over}|   24|
+--------------------+-----+
root
|-- pair: struct (nullable = true)
|    |-- _1: string (nullable = true)
|    |-- _2: string (nullable = true)
|-- count: long (nullable = true)

可以使用window函数计算relative frequency,该函数按pair中的第二个字进行分区,并应用sum运算。

然后,我们根据count将df中的条目限制在顶部x,并最终按对中的第二个单词和相对频率排序。

from pyspark.sql import functions as F
from pyspark.sql import Window as W
data = [(("project", "gutenberg"), 69,),
(("gutenberg", "ebook"), 14,),
(("ebook", "of"), 5,),
(("adventures", "of"), 6,),
(("by", "sir"), 12,),
(("conan", "doyle"), 1,),
(("changing", "all"), 2,),
(("all", "over"), 24,), ]
df = spark.createDataFrame(data, ("pair", "count", ))
ws = W.partitionBy(F.col("pair")._2).rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
(df.withColumn("relative_freq", F.col("count") / F.sum("count").over(ws))
.orderBy(F.col("count").desc())
.limit(3) # change here to select top 1000
.orderBy(F.desc(F.col("pair")._2), F.col("relative_freq").desc())
).show()
"""
+--------------------+-----+-------------+
|                pair|count|relative_freq|
+--------------------+-----+-------------+
|         {all, over}|   24|          1.0|
|{project, gutenberg}|   69|          1.0|
|  {gutenberg, ebook}|   14|          1.0|
+--------------------+-----+-------------+
"""

最新更新