计算MAPE并应用于PySpark分组数据帧[@pandas_udf]



目标:为每个唯一的ID计算mean_absolute_percentage_error(MAPE)。

  • y-实际值
  • yhat-预测值

PySpark数据帧示例:join_df

+----------+----------+-------+---------+----------+----------+
|        ID|        ds|      y|     yhat|yhat_upper|yhat_lower|
+----------+----------+-------+---------+----------+----------+
|    Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|    Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929|   693.786|
|    Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253|  664.1514|
|    Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392|  639.4879|
|    Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|    Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268|  665.9529|
+----------+----------+-------+---------+----------+----------+
final_schema =StructType([
StructField('ds',DateType()),
StructField('ID',IntegerType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('mape',FloatType())
])

我尝试创建一个uff,并使用apply函数将其应用于ID

from sklearn.metrics import mean_absolute_percentage_error
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):

mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 
join_df['mape'] = mape

return join_df
df_apply = join_df.groupby('ID').applyInPandas(gr_mape_val, final_schema)
df_apply.show()

然而,我得到了错误:

PythonException:'TypeError:用户定义函数的返回类型应为pandas。DataFrame,但为<类'numpy.foat32'>'

我知道我请求将MAPE作为numpy输出,它应该是数据帧。但我确信,如果我知道为了获得每个ID的MAPE,到底需要做什么不同的事情。

您需要返回一个带有PandasUDFType.GROUPED_MAP的DataFrame,因为您返回的是一个numpy数组,因此您会看到异常。

您还需要根据函数从组中最终返回的数据帧修改模式

你也应该使用-applyInPandas,我已经添加了它的用法以及

数据准备

s= StringIO("""
ID,ds,y,yhat,yhat_upper,yhat_lower
Ax849b,2021-07-01,1165.59, 1298.809, 1939.1261, 687.48206
Ax849b,2021-07-02,1120.69, 1295.552, 1892.4929,   693.786
Ax849b,2021-07-03,1120.69, 1294.079, 1923.0253,  664.1514
Ax849b,2021-07-04,1120.69,1295.0399, 1947.6392,  639.4879
Bz383J,2021-07-03,1108.71,1159.4934, 1917.6515, 652.76624
Bz383J,2021-07-04,1062.77,1191.2385, 1891.9268,  665.9529
""")
df = pd.read_csv(s,delimiter=',')
sparkDF = sql.createDataFrame(df)
sparkDF.show()
+------+----------+-------+---------+----------+----------+
|    ID|        ds|      y|     yhat|yhat_upper|yhat_lower|
+------+----------+-------+---------+----------+----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 1939.1261| 687.48206|
|Ax849b|2021-07-02|1120.69| 1295.552| 1892.4929|   693.786|
|Ax849b|2021-07-03|1120.69| 1294.079| 1923.0253|  664.1514|
|Ax849b|2021-07-04|1120.69|1295.0399| 1947.6392|  639.4879|
|Bz383J|2021-07-03|1108.71|1159.4934| 1917.6515| 652.76624|
|Bz383J|2021-07-04|1062.77|1191.2385| 1891.9268|  665.9529|
+------+----------+-------+---------+----------+----------+

Pandas UDF-用法

final_schema =StructType([
StructField('ID',StringType()),
StructField('ds',StringType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('mape',FloatType())
])
@F.pandas_udf(final_schema, PandasUDFType.GROUPED_MAP)
def gr_mape_val(join_df):

mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 

join_df['mape'] = mape

return join_df

sparkDF.groupby('ID').apply(gr_mape_val).show()
+------+----------+-------+---------+----------+----------+-----------+
|    ID|        ds|      y|     yhat|yhat_lower|yhat_upper|       mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552|   693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079|  664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399|  639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385|  665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+

苹果iPad

final_schema =StructType([
StructField('ID',StringType()),
StructField('ds',StringType()),
StructField('y',FloatType()),
StructField('yhat',FloatType()),
StructField('yhat_lower',FloatType()),
StructField('yhat_upper',FloatType()),
StructField('mape',FloatType())
])

def gr_mape_val(join_df):

mape = mean_absolute_percentage_error(join_df["y"], join_df["yhat"]) 

join_df['mape'] = mape

return join_df

sparkDF.groupby('ID').applyInPandas(gr_mape_val,final_schema).show()
+------+----------+-------+---------+----------+----------+-----------+
|    ID|        ds|      y|     yhat|yhat_lower|yhat_upper|       mape|
+------+----------+-------+---------+----------+----------+-----------+
|Ax849b|2021-07-01|1165.59| 1298.809| 687.48206| 1939.1261| 0.14515346|
|Ax849b|2021-07-02|1120.69| 1295.552|   693.786| 1892.4929| 0.14515346|
|Ax849b|2021-07-03|1120.69| 1294.079|  664.1514| 1923.0253| 0.14515346|
|Ax849b|2021-07-04|1120.69|1295.0399|  639.4879| 1947.6392| 0.14515346|
|Bz383J|2021-07-03|1108.71|1159.4934| 652.76624| 1917.6515|0.083342426|
|Bz383J|2021-07-04|1062.77|1191.2385|  665.9529| 1891.9268|0.083342426|
+------+----------+-------+---------+----------+----------+-----------+

最新更新