我正在执行基于3个不同的PySpark数据帧的计算。
该脚本在某种意义上工作,它按应有的方式执行计算,但是,我难以正确处理所述计算的结果。
import sys
import numpy as np
from pyspark import SparkConf, SparkContext, SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)
# Dummy Data
df = sqlContext.createDataFrame([[0,1,0,0,0],[1,1,0,0,1],[0,0,1,0,1],[1,0,1,1,0],[1,1,0,0,0]], ['p1', 'p2', 'p3', 'p4', 'p5'])
df.show()
+---+---+---+---+---+
| p1| p2| p3| p4| p5|
+---+---+---+---+---+
| 0| 1| 0| 0| 0|
| 1| 1| 0| 0| 1|
| 0| 0| 1| 0| 1|
| 1| 0| 1| 1| 0|
| 1| 1| 0| 0| 0|
+---+---+---+---+---+
# Values
values = sqlContext.createDataFrame([(0,1,'p1'),(None,1,'p2'),(0,0,'p3'),(None,0, 'p4'),(1,None,'p5')], ('f1', 'f2','index'))
values.show()
+----+----+-----+
| f1| f2|index|
+----+----+-----+
| 0| 1| p1|
|null| 1| p2|
| 0| 0| p3|
|null| 0| p4|
| 1|null| p5|
+----+----+-----+
# Weights
weights = sqlContext.createDataFrame([(4,3,'p1'),(None,1,'p2'),(2,2,'p3'),(None, 3, 'p4'),(3,None,'p5')], ('f1', 'f2','index'))
weights.show()
+----+----+-----+
| f1| f2|index|
+----+----+-----+
| 4| 3| p1|
|null| 1| p2|
| 2| 2| p3|
|null| 3| p4|
| 3|null| p5|
+----+----+-----+
# Function: it sums the vector W for the values of Row equal to the value of V and then divide by the length of V.
# If there a no similarities between Row and V outputs 0
def W_sum(row,v,w):
if len(w[row==v])>0:
return float(np.sum(w[row==v])/len(w))
else:
return 0.0
对于数据中的每一列和每一行,将应用上述函数。
# We iterate over the columns of Values (except the last one called index)
for val in values.columns[:-1]:
# we filter the data to work only with the columns that are defined for the selected Value
defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()]
# we select only the useful columns
df_select= df.select(defined_col)
# we retrieve the reference value and weights
V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten()
W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten()
W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType())
df_select.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in df_select.columns))))
这给出了:
+---+---+---+---+---+---+
| p1| p2| p3| p4| p5| f1|
+---+---+---+---+---+---+
| 0| 1| 0| 0| 0|2.0|
| 1| 1| 0| 0| 1|1.0|
| 0| 0| 1| 0| 1|2.0|
| 1| 0| 1| 1| 0|0.0|
| 1| 1| 0| 0| 0|0.0|
+---+---+---+---+---+---+
它按照我的要求将列添加到切片数据帧。问题是我宁愿将数据收集到一个新的数据中,我可以在最后访问该新数据以查阅结果。
是否有可能在 PySpark 中(有点有效地)增长一个数据帧,就像我对熊猫一样?
编辑以使我的目标更清晰:理想情况下,我会得到一个只包含计算列的数据帧,如下所示:
+---+---+
| f1| f2|
+---+---+
|2.0|1.0|
|1.0|2.0|
|2.0|0.0|
|0.0|0.0|
|0.0|2.0|
+---+---+
你的问题有一些问题...
首先,你的for
循环会产生一个错误,因为最后一行中的df_select
没有定义;最后也没有赋值(它会产生什么?)。
假设df_select
实际上是您的subsubsample
数据帧,之前定义了一些行,并且您的最后一行类似于
new_df = subsubsample.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in subsubsample.columns))))
然后你的问题开始变得更加清晰。因为
values.columns[:-1]
# ['f1', 'f2']
整个循环的结果将只是
+---+---+---+---+---+
| p1| p2| p3| p4| f2|
+---+---+---+---+---+
| 0| 1| 0| 0|1.0|
| 1| 1| 0| 0|2.0|
| 0| 0| 1| 0|0.0|
| 1| 0| 1| 1|0.0|
| 1| 1| 0| 0|2.0|
+---+---+---+---+---+
即仅包含f2
列(自然,因为带有f1
的结果被简单地覆盖)。
现在,正如我所说,假设情况是这样的,并且您的问题实际上是如何将两列f1
和f2
放在一起而不是在不同的数据帧中,您可以忘记subsubsample
并将列附加到初始df
,然后可能会删除不需要的列:
init_cols = df.columns
init_cols
# ['p1', 'p2', 'p3', 'p4', 'p5']
new_df = df
for val in values.columns[:-1]:
# we filter the data to work only with the columns that are defined for the selected Value
defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()]
# we retrieve the reference value and weights
V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten()
W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten()
W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType())
new_df = new_df.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in defined_col)))) # change here
# drop initial columns:
for i in init_cols:
new_df = new_df.drop(i)
由此产生的new_df
将是:
+---+---+
| f1| f2|
+---+---+
|2.0|1.0|
|1.0|2.0|
|2.0|0.0|
|0.0|0.0|
|0.0|2.0|
+---+---+
更新(注释后):要强制W_sum
函数中的除法为浮点数,请使用:
from __future__ import division
现在new_df
将是:
+---------+----+
| f1| f2|
+---------+----+
| 2.0| 1.5|
|1.6666666|2.25|
|2.3333333|0.75|
| 0.0|0.75|
|0.6666667|2.25|
+---------+----+
根据您的评论,f2
完全正确。