转换df.应用于spark以使用所有核心并行运行



我们有一个熊猫数据框架正在使用。我们有一个在零售数据中使用的函数,它每天逐行运行来计算商品之间的差异,如下图

for itemiter in range(len(RetDf)):
column = RetDf.loc[itemiter , "username"]
RetDf[column] = RetDf.apply(lambda row: ItemDiff(RetDf.loc[itemiter, 'Val'], row['Val']), axis=1)

是否有办法将其转换为sparkcontext RDD并行使用所有内核

Sample Data with dummy values for retdf: 
username    UserId     Val
abc75757    1234       [0.0 , 0.0, 1.0, 2.0] 
abcraju     4567       [0.0 , 0.0, 1.0, 2.0]
xyzuser     4343       [0.0 , 0.0, 1.0, 2.0]
user4abc    2323       [0.0 , 0.0, 1.0, 2.0]

FinalOutput: 
username    UserId     Val                     abc75757  abcraju        xyzuser     user4abc
abc75757    1234       [0.0 , 0.0, 1.0, 2.0]   2.0       0.0            0.0         1.0
abcraju     4567       [0.0 , 0.0, 1.0, 2.0]   2.0       0.0            0.0         1.0
xyzuser     4343       [0.0 , 0.0, 1.0, 2.0]   2.0       0.0            0.0         1.0
user4abc    2323       [0.0 , 0.0, 1.0, 2.0]   2.0       0.0            4.0         1.0
ItemDiff
def ItemDiff(z1,z2):
distance_t = 0.0
path_t = [(0,0)]
distance_t, path_t = fastdtw(z1,z2)
return(distance_t)

你把一个组合问题变成了一个乘积问题,比必要的计算多了一倍,我不确定一个好的纯熊猫方法来做这个…但是,即使没有并行化,这仍然应该快得多。

a = [["abc75757", 1234, [4.0, 0.0, 1.0, 4.0]],
["abcraju", 4567, [0.0, 0.0, 3.0, 2.0]],
["xyzuser", 4343, [0.0, 1.0, 1.0, 2.0]],
["user4abc", 2323, [0.0, 0.0, 1.0, 3.0]]]
RetDf = pd.DataFrame(a, columns=['username', 'UserId', 'Val'])
from itertools import combinations
combos = combinations(RetDf[['username', 'Val']].to_numpy(), r=2)
combos = [(x[0][0], x[1][0], fastdtw(x[0][1], x[1][1])[0]) for x in combos]
permuts = [(x[0], x[1], x[2]) for x in combos] + [(x[1], x[0], x[2]) for x in combos]
df = pd.DataFrame(permuts, columns=['username', 'pair', 'value']).pivot(index='username', columns='pair').droplevel(0, axis=1).reset_index()
output = RetDf.merge(df).fillna(0)
print(output)

输出:

username  UserId                   Val  abc75757  abcraju  user4abc  xyzuser                                                                 
0   abc75757    1234  [4.0, 0.0, 1.0, 4.0]       0.0      8.0       5.0      6.0
1   abcraju     4567  [0.0, 0.0, 3.0, 2.0]       8.0      0.0       2.0      3.0
2   xyzuser     4343  [0.0, 1.0, 1.0, 2.0]       6.0      3.0       1.0      0.0
3   user4abc    2323  [0.0, 0.0, 1.0, 3.0]       5.0      2.0       0.0      1.0

Spark上应该有Pandas,但这里没有帮助,因为他们没有pd.loc的直接翻译

看起来你正在做一个笛卡尔连接,这是昂贵的,但这是我建议你做的:

from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import udf
import pyspark.sql.functions as f
from pyspark.sql.types import *
from fastdtw import fastdtw
#Create PySpark SparkSession
spark = SparkSession.builder 
.master("local[1]") 
.appName("SparkByExamples.com") 
.getOrCreate()
#Create PySpark DataFrame from Pandas
raw_data = { 'username' :[ 'abc75757', 'abcraju', 'xyzuser', 'user4abc'], 'UserId':[ 1234, 4567,4343,2323], 'Val': [[0.0 , 0.0, 1.0, 2.0] ,[0.0 , 0.0, 1.0, 2.0],[0.0 , 0.0, 1.0, 2.0],[0.0 , 0.0, 1.0, 2.0]]}
RetDf = pd.DataFrame(raw_data)
def ItemDiff(z1,z2):
distance_t = 0.0
path_t = [(0,0)]
distance_t, path_t = fastdtw(z1,z2)
return float(distance_t)
itemDiff = udf(ItemDiff, FloatType()) # create UDF to do work.

sparkDF=spark.createDataFrame(RetDf) 
cartesianJoin = sparkDF.crossJoin(sparkDF) # this is expensive but necessary
.toDF("UserId","Val","username","myUserId","myVal","my_username") #renaming the columns for convience
.select( itemDiff( "Val", "myVal" ).alias("dist"), f.col("*") ) # run UDF 
.groupBy( "Val","UserId","username" ) # this enables us to pivot 
.pivot("my_username") #exposes the calculation  be careful to uses the 'exploded' column.  
.max("dist").show() #handy tick as their is only 1 value so max is just the number.
+--------------------+------+--------+--------+-------+--------+-------+        
|                 Val|UserId|username|abc75757|abcraju|user4abc|xyzuser|
+--------------------+------+--------+--------+-------+--------+-------+
|[0.0, 0.0, 1.0, 2.0]|  1234|abc75757|     0.0|    0.0|     0.0|    0.0|
|[0.0, 0.0, 1.0, 2.0]|  4343| xyzuser|     0.0|    0.0|     0.0|    0.0|
|[0.0, 0.0, 1.0, 2.0]|  4567| abcraju|     0.0|    0.0|     0.0|    0.0|
|[0.0, 0.0, 1.0, 2.0]|  2323|user4abc|     0.0|    0.0|     0.0|    0.0|
+--------------------+------+--------+--------+-------+--------+-------+

UDF文档在这里。如果您可以重新编写逻辑以避免使用UDF,那么它将运行得更快,但是您需要了解可以使用哪些spark sql函数来完成相同的工作。如果你真的需要所有的数据来计算列,或者你可以简化你的逻辑,这可能是一个很好的时间来检查。

最新更新