我正在数据块上使用SparkNLP和SparkML进行NLP。
我使用LDA(来自SparkML(进行主题建模,得到了以下主题。
它是pyspark数据帧(df1(:
df1:
t_id word_index weights
0 [0, 2, 3] [0.2105, 0.116, 0.18]
1 [1, 4, 6] [0.15, 0.05, 0.36]
"t_id" is topic id.
"weights" is the weight value of each word with index in "word_index"
The "word_index" in df1 corresponds to the location of each word in the list (lt).
df1 is small with not more than 100 rows.
我有一个单词列表(lt(:它是python列表
lt:
['like', 'book', 'music', 'bike', 'great', 'pen', 'laptop']
lt has about 20k words.
我有另一个超过2000万行的大型pyspark数据帧(df2(。它的大小为50 GB以上。
df2:
u_id p_id reviews
sra tvs "I like this music" # some english tokens (each token can be found in "lt")
fbs dvr "The book is great"
我想把";t_ id";(主题(到df2的每一行,这样我就可以得到一个pyspark数据帧,比如:
u_id p_id reviews t_id the_highest_weights
sra tvs "I like this music" 1 # the highest of all tokens' weights among all "t_id"s
fbs dvr "The book is great" 4
但是,一个评论可能具有多个";t_ id";(主题(,因为评论中的单词可能被多个"主题"覆盖;t_id";。所以我必须计算每个";t_id"s的总重量;t_ id";具有最高总重量的";评论";在df2。
它被表示为";最高权重";最终结果。
我不想使用";对于循环";逐行处理,因为它对大型数据帧来说效率不高。
如何使用pyspark数据帧(而不是panda(和矢量化(如果需要(来高效地获得结果?
感谢
我不确定你想要计算的确切内容,但你可以调整这个答案来获得你需要的内容。假设你想为每个句子找到具有最大分数的t_id
(由其标记的权重之和给出(。
您可以从生成一个数据帧开始,该数据帧将每个单词与其索引相关联。
df_lt = spark.createDataFrame([(i, lt[i]) for i in
range(0, len(lt))], ['word_index', 'w'])
然后,我们将使df1变平,使得每一行都包含一个t_id
索引、一个单词索引和相应的权重。为此,我们可以使用UDF。请注意,在火花>=2.4您可以使用array_union
和create_map
,但由于df1
很小,使用UDF不会有问题。
def create_pairs(index, weights):
return [(index[i], weights[i]) for i in range(0, len(index))]
create_pairs_udf = udf(create_pairs, ArrayType(StructType([
StructField(IntegerType(), 'word_index'),
StructField(DoubleType(), 'weight')
])))
df1_exp = df1
.select('t_id', explode(create_pairs_udf(df1['word_index'], df1['weights']))
.alias('pair'))
.select('t_id', 'pair.word_index', 'pair.weight')
最后,对大型数据帧df2
进行了研究。我们首先将句子分解为每行一个单词(+u_id
和p_id
(。然后我们需要加入df_lt
来将单词翻译成索引。然后,通过加入df1_exp
,我们将每个单词索引与其权重相关联。然后,我们根据所有索引(包括t_id
(进行分组,以计算权重的总和,并再次分组,为每个句子选择最佳的t_id
。
为了加快速度,我们可以提示spark广播较小的df_lt
和df1_exp
,以避免混淆较大的df2
。
代码如下:
df2
.select("u_id", "p_id", explode(split(df2['reviews'], "\s+")).alias("w"))
.join(broadcast(df_lt), ['w'])
.drop('w')
.join(broadcast(df1_exp), ['word_index'])
.groupBy('u_id', 'p_id', 't_id')
.agg(sum('weight').alias('score'))
.withColumn('t_id', struct('score', 't_id'))
.groupBy('u_id', 'p_id')
.agg(max('t_id').alias('t_id'))
.select('u_id', 'p_id', 't_id.score', 't_id.t_id')
.show()
+----+----+------+----+
|u_id|p_id| score|t_id|
+----+----+------+----+
| fbs| dvr| 0.2| 1|
| sra| tvs|0.3265| 0|
+----+----+------+----+