如何有效地连接大型pyspark数据帧和小型python列表,以获得数据块上的一些NLP结果



我正在数据块上使用SparkNLP和SparkML进行NLP。

我使用LDA(来自SparkML(进行主题建模,得到了以下主题。

它是pyspark数据帧(df1(:

df1:
t_id word_index  weights
0   [0, 2, 3] [0.2105, 0.116, 0.18]
1   [1, 4, 6] [0.15, 0.05, 0.36]
"t_id" is topic id.
"weights" is the weight value of each word with index in "word_index"
The "word_index" in df1 corresponds to the location of each word in the list (lt).
df1 is small with not more than 100 rows.

我有一个单词列表(lt(:它是python列表

lt:
['like', 'book', 'music', 'bike', 'great', 'pen', 'laptop']

lt has about 20k words.

我有另一个超过2000万行的大型pyspark数据帧(df2(。它的大小为50 GB以上。

df2:

u_id p_id reviews
sra  tvs  "I like this music" # some english tokens (each token can be found in "lt")  
fbs  dvr  "The book is great"

我想把";t_ id";(主题(到df2的每一行,这样我就可以得到一个pyspark数据帧,比如:

u_id p_id reviews               t_id the_highest_weights
sra  tvs  "I like this music"   1    # the highest of all tokens' weights among all "t_id"s
fbs  dvr  "The book is great"   4

但是,一个评论可能具有多个";t_ id";(主题(,因为评论中的单词可能被多个"主题"覆盖;t_id";。所以我必须计算每个";t_id"s的总重量;t_ id";具有最高总重量的";评论";在df2。

它被表示为";最高权重";最终结果。

我不想使用";对于循环";逐行处理,因为它对大型数据帧来说效率不高。

如何使用pyspark数据帧(而不是panda(和矢量化(如果需要(来高效地获得结果?

感谢

我不确定你想要计算的确切内容,但你可以调整这个答案来获得你需要的内容。假设你想为每个句子找到具有最大分数的t_id(由其标记的权重之和给出(。

您可以从生成一个数据帧开始,该数据帧将每个单词与其索引相关联。

df_lt = spark.createDataFrame([(i, lt[i]) for i in 
range(0, len(lt))], ['word_index', 'w'])

然后,我们将使df1变平,使得每一行都包含一个t_id索引、一个单词索引和相应的权重。为此,我们可以使用UDF。请注意,在火花>=2.4您可以使用array_unioncreate_map,但由于df1很小,使用UDF不会有问题。


def create_pairs(index, weights):
return [(index[i], weights[i]) for i in range(0, len(index))]
create_pairs_udf = udf(create_pairs, ArrayType(StructType([
StructField(IntegerType(), 'word_index'),
StructField(DoubleType(), 'weight')
])))
df1_exp = df1
.select('t_id', explode(create_pairs_udf(df1['word_index'], df1['weights']))
.alias('pair'))
.select('t_id', 'pair.word_index', 'pair.weight')

最后,对大型数据帧df2进行了研究。我们首先将句子分解为每行一个单词(+u_idp_id(。然后我们需要加入df_lt来将单词翻译成索引。然后,通过加入df1_exp,我们将每个单词索引与其权重相关联。然后,我们根据所有索引(包括t_id(进行分组,以计算权重的总和,并再次分组,为每个句子选择最佳的t_id

为了加快速度,我们可以提示spark广播较小的df_ltdf1_exp,以避免混淆较大的df2

代码如下:

df2
.select("u_id", "p_id", explode(split(df2['reviews'], "\s+")).alias("w"))
.join(broadcast(df_lt), ['w'])
.drop('w')
.join(broadcast(df1_exp), ['word_index'])
.groupBy('u_id', 'p_id', 't_id')
.agg(sum('weight').alias('score'))
.withColumn('t_id', struct('score', 't_id'))
.groupBy('u_id', 'p_id')
.agg(max('t_id').alias('t_id'))
.select('u_id', 'p_id', 't_id.score', 't_id.t_id')
.show()
+----+----+------+----+
|u_id|p_id| score|t_id|
+----+----+------+----+
| fbs| dvr|   0.2|   1|
| sra| tvs|0.3265|   0|
+----+----+------+----+

最新更新