我正在尝试计算文本文件中的单词对。首先,我对文本进行了一些预处理,然后计算单词对,如下所示:
((Aspire, to), 1) ; ((to, inspire), 4) ; ((inspire, before), 38)...
现在,我想报告1000个最频繁的配对,按排序
- 单词(两个单词中的第二个(
- 相对频率(成对出现次数/第二个单词总出现次数(
以下是我迄今为止所做的
from pyspark.sql import SparkSession
import re
spark = SparkSession.builder.appName("Bigram occurences and relative frequencies").master("local[*]").getOrCreate()
sc = spark.sparkContext
text = sc.textFile("big.txt")
tokens = text.map(lambda x: x.lower()).map(lambda x: re.split("[s,.;:!?]+", x))
pairs = tokens.flatMap(lambda xs: (tuple(x) for x in zip(xs, xs[1:]))).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
frame = pairs.toDF(['pair', 'count'])
# Dataframe ordered by the most frequent pair to the least
most_frequent = frame.sort(frame['count'].desc())
# For each row, trying to add a column with the relative frequency, but I'm getting an error
with_rf = frame.withColumn("rf", frame['count'] / (frame.pair._2.sum()))
我想我已经比较接近我想要的结果了,但我想不通。我是Spark和DataFrames的新手。我也试过
import pyspark.sql.functions as F
frame.groupBy(frame['pair._2']).agg((F.col('count') / F.sum('count')).alias('rf')).show()
如有任何帮助,我们将不胜感激。
EDIT:这是frame
数据帧的示例
+--------------------+-----+
| pair|count|
+--------------------+-----+
|{project, gutenberg}| 69|
| {gutenberg, ebook}| 14|
| {ebook, of}| 5|
| {adventures, of}| 6|
| {by, sir}| 12|
| {conan, doyle)}| 1|
| {changing, all}| 2|
| {all, over}| 24|
+--------------------+-----+
root
|-- pair: struct (nullable = true)
| |-- _1: string (nullable = true)
| |-- _2: string (nullable = true)
|-- count: long (nullable = true)
可以使用window
函数计算relative frequency
,该函数按pair
中的第二个字进行分区,并应用sum
运算。
然后,我们根据count
将df中的条目限制在顶部x,并最终按对中的第二个单词和相对频率排序。
from pyspark.sql import functions as F
from pyspark.sql import Window as W
data = [(("project", "gutenberg"), 69,),
(("gutenberg", "ebook"), 14,),
(("ebook", "of"), 5,),
(("adventures", "of"), 6,),
(("by", "sir"), 12,),
(("conan", "doyle"), 1,),
(("changing", "all"), 2,),
(("all", "over"), 24,), ]
df = spark.createDataFrame(data, ("pair", "count", ))
ws = W.partitionBy(F.col("pair")._2).rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
(df.withColumn("relative_freq", F.col("count") / F.sum("count").over(ws))
.orderBy(F.col("count").desc())
.limit(3) # change here to select top 1000
.orderBy(F.desc(F.col("pair")._2), F.col("relative_freq").desc())
).show()
"""
+--------------------+-----+-------------+
| pair|count|relative_freq|
+--------------------+-----+-------------+
| {all, over}| 24| 1.0|
|{project, gutenberg}| 69| 1.0|
| {gutenberg, ebook}| 14| 1.0|
+--------------------+-----+-------------+
"""