如何在pyspark中测量一行和矩阵/表之间的相似性得分



我有用户的偏好表:

+-------+---- -+-------+-------+--
|user_id|Action| Comedy|Fantasy|
+-------+----- +-------+-------+--
|   100 |  0   | 0.33..| 0.66..|
|   101 |0.42..| 0.15..| 0.57..|
+-------+------+-------+-------+--

和电影类型内容表:

+-------+---- -+-------+-------+--
|movieId|Action| Comedy|Fantasy|
+-------+----- +-------+-------+--
|  1001 |  1   |   1   |   0   |
|  1011 |  0   |   1   |   1   |
+-------+------+-------+-------+--

如何取用户偏好行(按其user_id(与每个电影内容行的点积(相似距离(,以按电影类型输出最优惠的movieId?RDD或DataFrame格式。

这是我的尝试。

crossProduct将数据帧与每个user_idmovieId合并,因此它将创建# of user_id * # of movieId数据帧的大小。

然后,可以使用特定函数将数组中的每个元素乘以zip_with。在这种情况下,x * y用于array1的每个x元素和array2y元素。

最后,您可以aggregate数组的乘法结果,也就是一个和。从sum = 0开始,将zipArrayx元素添加到温度变量sum,这正是通常的求和函数。

from pyspark.sql.functions import array, arrays_zip, expr, rank, desc
df1 = spark.read.option("header","true").option("inferSchema","true").csv("test1.csv")
df2 = spark.read.option("header","true").option("inferSchema","true").csv("test2.csv")
df1_cols = df1.columns
df1_cols.remove('user_id')
df2_cols = df2.columns
df2_cols.remove('movieId')

df1 = df1.withColumn('array1', array(df1_cols))
df2 = df2.withColumn('array2', array(df2_cols))
df3 = df1.crossJoin(df2)
df3.show(10, False)
+-------+------+------+-------+------------------+-------+------+------+-------+---------+
|user_id|Action|Comedy|Fantasy|array1            |movieId|Action|Comedy|Fantasy|array2   |
+-------+------+------+-------+------------------+-------+------+------+-------+---------+
|100    |0.0   |0.33  |0.66   |[0.0, 0.33, 0.66] |1001   |1     |1     |0      |[1, 1, 0]|
|100    |0.0   |0.33  |0.66   |[0.0, 0.33, 0.66] |1011   |0     |1     |1      |[0, 1, 1]|
|101    |0.42  |0.15  |0.57   |[0.42, 0.15, 0.57]|1001   |1     |1     |0      |[1, 1, 0]|
|101    |0.42  |0.15  |0.57   |[0.42, 0.15, 0.57]|1011   |0     |1     |1      |[0, 1, 1]|
+-------+------+------+-------+------------------+-------+------+------+-------+---------+

df3 = df3.withColumn('zipArray',   expr("zip_with(array1, array2, (x, y) -> x * y)")) 
.withColumn('dotProduct', expr("aggregate(zipArray, 0D, (sum, x) -> sum + x)"))

df3.show(10, False)
+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+
|user_id|Action|Comedy|Fantasy|array1            |movieId|Action|Comedy|Fantasy|array2   |zipArray         |dotProduct|
+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+
|100    |0.0   |0.33  |0.66   |[0.0, 0.33, 0.66] |1001   |1     |1     |0      |[1, 1, 0]|[0.0, 0.33, 0.0] |0.33      |
|100    |0.0   |0.33  |0.66   |[0.0, 0.33, 0.66] |1011   |0     |1     |1      |[0, 1, 1]|[0.0, 0.33, 0.66]|0.99      |
|101    |0.42  |0.15  |0.57   |[0.42, 0.15, 0.57]|1001   |1     |1     |0      |[1, 1, 0]|[0.42, 0.15, 0.0]|0.57      |
|101    |0.42  |0.15  |0.57   |[0.42, 0.15, 0.57]|1011   |0     |1     |1      |[0, 1, 1]|[0.0, 0.15, 0.57]|0.72      |
+-------+------+------+-------+------------------+-------+------+------+-------+---------+-----------------+----------+

from pyspark.sql import Window
window = Window.partitionBy('user_id').orderBy(desc('dotProduct'))
df3.select('user_id', 'movieId', 'dotProduct') 
.withColumn('rank', rank().over(window)) 
.filter('rank = 1') 
.drop('rank') 
.show(10, False)
+-------+-------+----------+
|user_id|movieId|dotProduct|
+-------+-------+----------+
|101    |1011   |0.72      |
|100    |1011   |0.99      |
+-------+-------+----------+

最新更新