Pyspark DataFrame:如何应用Scipy.按组优化函数



我有一个代码,该代码效果很好,但使用pandas数据框架Groupby处理。但是,由于文件很大(> 7,000万组,我需要将代码转换为使用Pyspark数据框架。这是使用PANDAS DataFrame带有小示例数据的原始代码:

import pandas as pd
import numpy as np
from scipy.optimize import minimize
df = pd.DataFrame({
'y0': np.random.randn(20),
'y1': np.random.randn(20),
'x0': np.random.randn(20), 
'x1': np.random.randn(20),
'grpVar': ['a', 'b'] * 10})
# Starting values
startVal = np.ones(2)*(1/2)
#Constraint  Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})
# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)
# Define a function to calculate sum of squared differences
def SumSqDif(a, df):
    return np.sum((df['y0'] - a[0]*df['x0'])**2 + (df['y1'] - a[1]*df['x1'])  **2)
# Define a function to call minimize function 
def RunMinimize(data, startVal, bnds, cons):
    ResultByGrp = minimize(SumSqDif, startVal, method='SLSQP',
    bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x
# Do the calculation by applyng the function by group:
# Create GroupBy object
grp_grpVar = df.groupby('grpVar')
Results = grp_grpVar.apply(RunMinimize, startVal=startVal, bnds=bnds, cons=cons))

现在我正在尝试使用Pyspark DataFrame我将pandas数据框架转换为Pyspark DataFrame,以进行测试代码。

sdf = sqlContext.createDataFrame(df)
type(sdf)
#  <class 'pyspark.sql.dataframe.DataFrame'>
# Create GroupBy object
Sgrp_grpVar = sdf.groupby('grpVar')
# Redefine functions
def sSumSqDif(a, sdf):
    return np.sum((sdf['y0'] - a[0]*sdf['x0'])**2 + (sdf['y1'] - a[1]*sdf['x1'])**2)
def sRunMinimize(data=sdf, startVal=startVal, bnds=bnds, cons=cons):
    ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                       bounds=bnds, constraints = cons, args=(data))
return ResultByGrp.x
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import DoubleType
from pyspark.sql.types import StringType
udf = UserDefinedFunction(sRunMinimize , StringType())
Results = Sgrp_grpVar.agg(sRunMinimize()) 

但是,在我尝试定义用户定义的函数UDF后,我收到了以下错误 - 请参见下文。高度赞赏纠正我的错误或建议替代方法的任何帮助。

udf = userDefinedFunction(srunminimize,stringType())Trackback(最近的最新电话): 文件",第1行,在 文件"/usr/hdp/current/spark2-client/python/pyspark/sql/functions.py",第1760行,in init self._judf = self._create_judf(name)........

您正在尝试编写一个用户定义的聚合函数,该功能无法在pyspark中完成,请参见https://stackoverflow.com/a/40030740。

您可以编写的是一个列表中的数据中的数据上的UDF

首先进行设置:

import pandas as pd 
import numpy as np 
from scipy.optimize import minimize
import pyspark.sql.functions as psf
from pyspark.sql.types import *
df = pd.DataFrame({
    'y0': np.random.randn(20),
    'y1': np.random.randn(20),
    'x0': np.random.randn(20), 
    'x1': np.random.randn(20),
    'grpVar': ['a', 'b'] * 10})
sdf = sqlContext.createDataFrame(df)
# Starting values
startVal = np.ones(2)*(1/2)
#Constraint  Sum of coefficients = 0
cons = ({'type':'eq', 'fun': lambda x: 1 - sum(x)})
# Bounds on coefficients
bnds = tuple([0,1] for x in startVal)

我们将广播这些变量,因为我们需要在汇总数据框架的每一行上调用它们,它将将值复制到每个节点,因此它们不必去将它们放在驱动程序上:

sc.broadcast(startVal)
sc.broadcast(bnds)

让我们使用collect_list汇总数据,我们将更改数据的结构,以便我们只有一个列(您可以将每列收集到不同的列中功能):

Sgrp_grpVar = sdf
    .groupby('grpVar')
    .agg(psf.collect_list(psf.struct("y0", "y1", "x0", "x1")).alias("data"))
Sgrp_grpVar.printSchema()
    root
     |-- grpVar: string (nullable = true)
     |-- data: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- y0: double (nullable = true)
     |    |    |-- y1: double (nullable = true)
     |    |    |-- x0: double (nullable = true)
     |    |    |-- x1: double (nullable = true)

我们现在可以创建UDF,返回的数据类型对于Pyspark来说太复杂了,Pyspark不支持numpy arrays,因此我们需要对其进行一些更改:

def sSumSqDif(a, data):
    return np.sum(
        (data['y0'] - a[0]*data['x0'])**2 
        + (data['y1'] - a[1]*data['x1'])**2)
def sRunMinimize(data, startVal=startVal, bnds=bnds, cons=cons):
    data = pd.DataFrame({k:v for k,v in zip(["y0", "y1", "x0", "x1"], data)})
    ResultByGrp = minimize(sSumSqDif, startVal, method='SLSQP',
                       bounds=bnds, constraints = cons, args=(data))
    return ResultByGrp.x.tolist()
sRunMinimize_udf = lambda startVal, bnds, cons: psf.udf(
    lambda data: sRunMinimize(data, startVal, bnds, cons), 
    ArrayType(DoubleType())
)

现在,我们可以将此功能应用于每个组的收集数据:

Results = Sgrp_grpVar.select(
    "grpVar", 
    sRunMinimize_udf(startVal, bnds, cons)("data").alias("res")
)
Results.show(truncate=False)
    +------+-----------------------------------------+
    |grpVar|res                                      |
    +------+-----------------------------------------+
    |b     |[0.4073139282953772, 0.5926860717046227] |
    |a     |[0.8275186444565927, 0.17248135554340727]|
    +------+-----------------------------------------+

,但我认为Pyspark不是正确的工具。

相关内容

  • 没有找到相关文章

最新更新