我们有一个熊猫数据框架正在使用。我们有一个在零售数据中使用的函数,它每天逐行运行来计算商品之间的差异,如下图
for itemiter in range(len(RetDf)):
column = RetDf.loc[itemiter , "username"]
RetDf[column] = RetDf.apply(lambda row: ItemDiff(RetDf.loc[itemiter, 'Val'], row['Val']), axis=1)
是否有办法将其转换为sparkcontext RDD并行使用所有内核
Sample Data with dummy values for retdf:
username UserId Val
abc75757 1234 [0.0 , 0.0, 1.0, 2.0]
abcraju 4567 [0.0 , 0.0, 1.0, 2.0]
xyzuser 4343 [0.0 , 0.0, 1.0, 2.0]
user4abc 2323 [0.0 , 0.0, 1.0, 2.0]
FinalOutput:
username UserId Val abc75757 abcraju xyzuser user4abc
abc75757 1234 [0.0 , 0.0, 1.0, 2.0] 2.0 0.0 0.0 1.0
abcraju 4567 [0.0 , 0.0, 1.0, 2.0] 2.0 0.0 0.0 1.0
xyzuser 4343 [0.0 , 0.0, 1.0, 2.0] 2.0 0.0 0.0 1.0
user4abc 2323 [0.0 , 0.0, 1.0, 2.0] 2.0 0.0 4.0 1.0
ItemDiff
def ItemDiff(z1,z2):
distance_t = 0.0
path_t = [(0,0)]
distance_t, path_t = fastdtw(z1,z2)
return(distance_t)
你把一个组合问题变成了一个乘积问题,比必要的计算多了一倍,我不确定一个好的纯熊猫方法来做这个…但是,即使没有并行化,这仍然应该快得多。
a = [["abc75757", 1234, [4.0, 0.0, 1.0, 4.0]],
["abcraju", 4567, [0.0, 0.0, 3.0, 2.0]],
["xyzuser", 4343, [0.0, 1.0, 1.0, 2.0]],
["user4abc", 2323, [0.0, 0.0, 1.0, 3.0]]]
RetDf = pd.DataFrame(a, columns=['username', 'UserId', 'Val'])
from itertools import combinations
combos = combinations(RetDf[['username', 'Val']].to_numpy(), r=2)
combos = [(x[0][0], x[1][0], fastdtw(x[0][1], x[1][1])[0]) for x in combos]
permuts = [(x[0], x[1], x[2]) for x in combos] + [(x[1], x[0], x[2]) for x in combos]
df = pd.DataFrame(permuts, columns=['username', 'pair', 'value']).pivot(index='username', columns='pair').droplevel(0, axis=1).reset_index()
output = RetDf.merge(df).fillna(0)
print(output)
输出:
username UserId Val abc75757 abcraju user4abc xyzuser
0 abc75757 1234 [4.0, 0.0, 1.0, 4.0] 0.0 8.0 5.0 6.0
1 abcraju 4567 [0.0, 0.0, 3.0, 2.0] 8.0 0.0 2.0 3.0
2 xyzuser 4343 [0.0, 1.0, 1.0, 2.0] 6.0 3.0 1.0 0.0
3 user4abc 2323 [0.0, 0.0, 1.0, 3.0] 5.0 2.0 0.0 1.0
Spark上应该有Pandas,但这里没有帮助,因为他们没有pd.loc
的直接翻译
看起来你正在做一个笛卡尔连接,这是昂贵的,但这是我建议你做的:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql.functions import udf
import pyspark.sql.functions as f
from pyspark.sql.types import *
from fastdtw import fastdtw
#Create PySpark SparkSession
spark = SparkSession.builder
.master("local[1]")
.appName("SparkByExamples.com")
.getOrCreate()
#Create PySpark DataFrame from Pandas
raw_data = { 'username' :[ 'abc75757', 'abcraju', 'xyzuser', 'user4abc'], 'UserId':[ 1234, 4567,4343,2323], 'Val': [[0.0 , 0.0, 1.0, 2.0] ,[0.0 , 0.0, 1.0, 2.0],[0.0 , 0.0, 1.0, 2.0],[0.0 , 0.0, 1.0, 2.0]]}
RetDf = pd.DataFrame(raw_data)
def ItemDiff(z1,z2):
distance_t = 0.0
path_t = [(0,0)]
distance_t, path_t = fastdtw(z1,z2)
return float(distance_t)
itemDiff = udf(ItemDiff, FloatType()) # create UDF to do work.
sparkDF=spark.createDataFrame(RetDf)
cartesianJoin = sparkDF.crossJoin(sparkDF) # this is expensive but necessary
.toDF("UserId","Val","username","myUserId","myVal","my_username") #renaming the columns for convience
.select( itemDiff( "Val", "myVal" ).alias("dist"), f.col("*") ) # run UDF
.groupBy( "Val","UserId","username" ) # this enables us to pivot
.pivot("my_username") #exposes the calculation be careful to uses the 'exploded' column.
.max("dist").show() #handy tick as their is only 1 value so max is just the number.
+--------------------+------+--------+--------+-------+--------+-------+
| Val|UserId|username|abc75757|abcraju|user4abc|xyzuser|
+--------------------+------+--------+--------+-------+--------+-------+
|[0.0, 0.0, 1.0, 2.0]| 1234|abc75757| 0.0| 0.0| 0.0| 0.0|
|[0.0, 0.0, 1.0, 2.0]| 4343| xyzuser| 0.0| 0.0| 0.0| 0.0|
|[0.0, 0.0, 1.0, 2.0]| 4567| abcraju| 0.0| 0.0| 0.0| 0.0|
|[0.0, 0.0, 1.0, 2.0]| 2323|user4abc| 0.0| 0.0| 0.0| 0.0|
+--------------------+------+--------+--------+-------+--------+-------+
UDF文档在这里。如果您可以重新编写逻辑以避免使用UDF,那么它将运行得更快,但是您需要了解可以使用哪些spark sql函数来完成相同的工作。如果你真的需要所有的数据来计算列,或者你可以简化你的逻辑,这可能是一个很好的时间来检查。