如何使用带有UDF的scipy.stats(或任何其他方式)在Pyspark中创建具有正态分布的数组



我目前正在将Python脚本迁移到PySpark,我有一个运行良好的Python脚本:

### PYTHON
import pandas as pd
import scipy.stats as st
def fnNormalDistribution(mean,std, n):
box = list(eval('st.norm')(*[mean,std]).rvs(n))
return box
df = pd.DataFrame([[18.2500365,2.7105814157004193],
[9.833353,2.121324586200329],
[41.55563866666666,7.118716782527054]],
columns = ['mean','std'])
df 

|    mean    |    std   |
|------------|----------|
|   18.250037|  2.710581|
|    9.833353|  2.121325|
|   41.555639|  7.118717|
n = 100 #Example
df['random_values'] = df.apply(lambda row: fnNormalDistribution(row["mean"], row["std"], n), axis=1)
df
|    mean    |    std   |                   random_values                  |
|------------|----------|--------------------------------------------------|
|   18.250037|  2.710581|[17.752189993958638, 18.883038367927465, 16.39...]|
|    9.833353|  2.121325|[10.31806454283759, 8.732261487201594, 11.6782...]|
|   41.555639|  7.118717|[38.17469739795093, 43.16514466083524, 49.2668...]|

但是当我尝试迁移到Pyspark时,我得到了以下错误:

### PYSPARK
def fnNormalDistribution(mean,std, n):
box = list(eval('st.norm')(*[mean,std]).rvs(n))
return box
udf_fnNomalDistribution = f.udf(fnNormalDistribution, t.ArrayType(t.DoubleType()))
columns = ['mean','std']
data = [(18.2500365,2.7105814157004193),
(9.833353,2.121324586200329),
(41.55563866666666,7.118716782527054)]
df = spark.createDataFrame(data=data,schema=columns)
df.show()
|    mean    |    std   |
|------------|----------|
|   18.250037|  2.710581|
|    9.833353|  2.121325|
|   41.555639|  7.118717|

df = df.withColumn('random_values', udf_fnNomalDistribution('mean','std',f.lit(n)))
df.show()
PythonException: 
An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
File "C:Sparkspark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkworker.py", line 604, in main
File "C:Sparkspark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkworker.py", line 596, in process
File "C:Sparkspark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkserializers.py", line 211, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "C:Sparkspark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkserializers.py", line 132, in dump_stream
for obj in iterator:
File "C:Sparkspark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkserializers.py", line 200, in _batched
for item in iterator:
File "C:Sparkspark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkworker.py", line 450, in mapper
File "C:Sparkspark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkworker.py", line 450, in <genexpr>
File "C:Sparkspark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkworker.py", line 85, in <lambda>
File "C:Sparkspark-3.1.2-bin-hadoop3.2pythonlibpyspark.zippysparkutil.py", line 73, in wrapper
return f(*args, **kwargs)
File "C:UsersUbitsAppDataLocalTemp/ipykernel_10604/2493247477.py", line 2, in fnNormalDistribution
File "<string>", line 1, in <module>
NameError: name 'st' is not defined

有没有办法在Pyspark中使用相同的函数,或者用另一种方式获取random_values列?我在谷歌上搜索了一下,没有任何出口。

感谢

我试过这个,可以像samkart建议的那样,通过在fnNormalDistribution内移动st来修复它。

我将把我的例子留在这里,因为Fugue可能会提供一种更可读的方式将其引入Spark,尤其是在处理模式方面。下面是完整的代码。

import pandas as pd
def fnNormalDistribution(mean,std, n):
import scipy.stats as st
box = (eval('st.norm')(*[mean,std]).rvs(n)).tolist()
return box
df = pd.DataFrame([[18.2500365,2.7105814157004193],
[9.833353,2.121324586200329],
[41.55563866666666,7.118716782527054]],
columns = ['mean','std'])
n = 100 #Example
def helper(df: pd.DataFrame) -> pd.DataFrame:
df['random_values'] = df.apply(lambda row: fnNormalDistribution(row["mean"], row["std"], n), axis=1)
return df
from fugue import transform
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# transform can take either pandas of spark DataFrame as input
# If engine is none, it will run on pandas
sdf = transform(df,
helper,
schema="*, random_values:[float]",
engine=spark)

sdf.show()

最新更新