以迭代方式保存到 Pyspark 中的新数据帧



我正在执行基于3个不同的PySpark数据帧的计算。

该脚本在某种意义上工作,它按应有的方式执行计算,但是,我难以正确处理所述计算的结果。

import sys
import numpy as np
from pyspark import SparkConf, SparkContext, SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)
# Dummy Data
df = sqlContext.createDataFrame([[0,1,0,0,0],[1,1,0,0,1],[0,0,1,0,1],[1,0,1,1,0],[1,1,0,0,0]], ['p1', 'p2', 'p3', 'p4', 'p5'])
df.show()
+---+---+---+---+---+
| p1| p2| p3| p4| p5|
+---+---+---+---+---+
|  0|  1|  0|  0|  0|
|  1|  1|  0|  0|  1|
|  0|  0|  1|  0|  1|
|  1|  0|  1|  1|  0|
|  1|  1|  0|  0|  0|
+---+---+---+---+---+
# Values
values = sqlContext.createDataFrame([(0,1,'p1'),(None,1,'p2'),(0,0,'p3'),(None,0, 'p4'),(1,None,'p5')], ('f1', 'f2','index'))
values.show()
+----+----+-----+
|  f1|  f2|index|
+----+----+-----+
|   0|   1|   p1|
|null|   1|   p2|
|   0|   0|   p3|
|null|   0|   p4|
|   1|null|   p5|
+----+----+-----+
# Weights
weights = sqlContext.createDataFrame([(4,3,'p1'),(None,1,'p2'),(2,2,'p3'),(None, 3, 'p4'),(3,None,'p5')], ('f1', 'f2','index'))
weights.show()
+----+----+-----+
|  f1|  f2|index|
+----+----+-----+
|   4|   3|   p1|
|null|   1|   p2|
|   2|   2|   p3|
|null|   3|   p4|
|   3|null|   p5|
+----+----+-----+
# Function: it sums the vector W for the values of Row equal to the value of V and then divide by the length of V.
# If there a no similarities between Row and V outputs 0
def W_sum(row,v,w):
if len(w[row==v])>0:
return float(np.sum(w[row==v])/len(w))
else:
return 0.0

对于数据中的每一列和每一行,将应用上述函数。

# We iterate over the columns of Values (except the last one called index)
for val in values.columns[:-1]:
# we filter the data to work only with the columns that are defined for the selected Value
defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()]
# we select only the useful columns
df_select= df.select(defined_col)
# we retrieve the reference value and weights
V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten()
W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten()
W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType())
df_select.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in df_select.columns))))

这给出了:

+---+---+---+---+---+---+
| p1| p2| p3| p4| p5| f1|
+---+---+---+---+---+---+
|  0|  1|  0|  0|  0|2.0|
|  1|  1|  0|  0|  1|1.0|
|  0|  0|  1|  0|  1|2.0|
|  1|  0|  1|  1|  0|0.0|
|  1|  1|  0|  0|  0|0.0|
+---+---+---+---+---+---+

它按照我的要求将列添加到切片数据帧。问题是我宁愿将数据收集到一个新的数据中,我可以在最后访问该新数据以查阅结果。
是否有可能在 PySpark 中(有点有效地)增长一个数据帧,就像我对熊猫一样?

编辑以使我的目标更清晰:理想情况下,我会得到一个只包含计算列的数据帧,如下所示:

+---+---+
| f1| f2|
+---+---+
|2.0|1.0|
|1.0|2.0|
|2.0|0.0|
|0.0|0.0|
|0.0|2.0|
+---+---+

你的问题有一些问题...

首先,你的for循环会产生一个错误,因为最后一行中的df_select没有定义;最后也没有赋值(它会产生什么?)。

假设df_select实际上是您的subsubsample数据帧,之前定义了一些行,并且您的最后一行类似于

new_df = subsubsample.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in subsubsample.columns))))

然后你的问题开始变得更加清晰。因为

values.columns[:-1]
#  ['f1', 'f2']

整个循环的结果将只是

+---+---+---+---+---+ 
| p1| p2| p3| p4| f2| 
+---+---+---+---+---+
|  0|  1|  0|  0|1.0|
|  1|  1|  0|  0|2.0|
|  0|  0|  1|  0|0.0|
|  1|  0|  1|  1|0.0|
|  1|  1|  0|  0|2.0|
+---+---+---+---+---+

即仅包含f2列(自然,因为带有f1的结果被简单地覆盖)。

现在,正如我所说,假设情况是这样的,并且您的问题实际上是如何将两列f1f2放在一起而不是在不同的数据帧中,您可以忘记subsubsample并将列附加到初始df,然后可能会删除不需要的列:

init_cols = df.columns
init_cols
#  ['p1', 'p2', 'p3', 'p4', 'p5']
new_df = df
for val in values.columns[:-1]:
# we filter the data to work only with the columns that are defined for the selected Value
defined_col = [i[0] for i in values.where(F.col(val) >= 0).select(values.index).collect()]
# we retrieve the reference value and weights
V = np.array(values.where(values.index.isin(defined_col)).select(val).collect()).flatten()
W = np.array(weights.where(weights.index.isin(defined_col)).select(val).collect()).flatten()
W_sum_udf = F.udf(lambda row: W_sum(row, V, W), FloatType())
new_df = new_df.withColumn(val, W_sum_udf(F.array(*(F.col(x) for x in defined_col)))) # change here
# drop initial columns:
for i in init_cols:
new_df = new_df.drop(i)

由此产生的new_df将是:

+---+---+ 
| f1| f2| 
+---+---+
|2.0|1.0| 
|1.0|2.0|
|2.0|0.0|
|0.0|0.0|
|0.0|2.0|
+---+---+ 

更新(注释后):要强制W_sum函数中的除法为浮点数,请使用:

from __future__ import division

现在new_df将是:

+---------+----+ 
|       f1|  f2|
+---------+----+ 
|      2.0| 1.5|
|1.6666666|2.25|
|2.3333333|0.75|
|      0.0|0.75|
|0.6666667|2.25|
+---------+----+

根据您的评论,f2完全正确。

相关内容

  • 没有找到相关文章

最新更新