我正在使用带有熊猫的jupyter笔记本,但是当我使用Spark时,我想使用Spark DataFrame进行转换或计算而不是Pandas。请帮助我将一些计算转换为Spark DataFrame或RDD。
dataframe:
df =
+--------+-------+---------+--------+
| userId | item | price | value |
+--------+-------+---------+--------+
| 169 | I0111 | 5300 | 1 |
| 169 | I0973 | 70 | 1 |
| 336 | C0174 | 455 | 1 |
| 336 | I0025 | 126 | 1 |
| 336 | I0973 | 4 | 1 |
| 770963 | B0166 | 2 | 1 |
| 1294537| I0110 | 90 | 1 |
+--------+-------+---------+--------+
1。使用熊猫计算:
(1) userItem = df.groupby(['userId'])['item'].nunique()
结果是一个串联对象:
+--------+------+
| userId | |
+--------+------+
| 169 | 2 |
| 336 | 3 |
| 770963 | 1 |
| 1294537| 1 |
+--------+------+
2。使用乘法
data_sum = df.groupby(['userId', 'item'])['value'].sum() --> result is Series object
average_played = np.mean(userItem) --> result is number
(2) weighted_games_played = data_sum * (average_played / userItem)
请帮助我使用Spark DataFrame和Spark上的操作器来执行此操作(1)和(2)
您可以使用以下内容来实现(1):
import pyspark.sql.functions as f
userItem=df.groupby('userId').agg(f.expr('count(distinct item)').alias('n_item'))
和(2):
data_sum=df.groupby(['userId','item']).agg(f.sum('value').alias('sum_value'))
average_played=userItem.agg(f.mean('n_item').alias('avg_played'))
data_sum=data_sum.join(userItem, on='userId').crossJoin(average_played)
data_sum=data_sum.withColumn("weighted_games_played", f.expr("sum_value*avg_played/n_item"))
您可以定义下面的方法:
import org.apache.spark.mllib.linalg.distributed.{RowMatrix}
import org.apache.spark.mllib.linalg.{Vectors,Matrices,DenseVector}
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
import org.apache.spark.{SparkConf,SparkContext}
object retain {
implicit class DataFrameTransforms(left: DataFrame) {
val dftordd = left.rdd.map{case row =>
Vectors.dense(row.toSeq.toArray.map{x=>x.asInstanceOf[Double]})}
val leftRM = new RowMatrix(dftordd)
def multiply(right:DataFrame):DataFrame = {
val matrixC = right.columns.map(col(_))
val arr = right.select(array(matrixC:_*).as("arr")).as[Array[Double]].collect.flatten
val rows = right.count().toInt
val cols = matrixC.length
val rightRM = Matrices.dense(cols,rows,arr).transpose
val product = leftRM.multiply(rightRM).rows
val x = product.map(r=>r.toArray).collect.map(p=>Row(p: _*))
var schema = new StructType()
var i = 0
val t = cols
while (i < t) {
schema = schema.add(StructField(s"component${i}", DoubleType, true))
i = i + 1
}
val err = spark.createDataFrame(sc.parallelize(x),schema)
err
}
}
}
,然后仅使用
import retain._
说您有两个称为DF1(M×N)和DF2(N×M)
的数据帧df1.multiply(df2)