PySpark中对DataFrame进行引导重新采样的高效嵌套循环



我有一个具有以下模式的DataFrame:

+--------------------+--------+-------+-------+-----------+--------------------+
|              userid|datadate|  runid|variant|device_type|          prediction|
+--------------------+--------+-------+-------+-----------+--------------------+
|0001d15b-e2da-4f4...|20220111|1196752|      1|     Mobile|  0.8827571312010658|
|00021723-2a0d-497...|20220111|1196752|      1|     Mobile| 0.30763173370229735|
|00021723-2a0d-497...|20220111|1196752|      0|     Mobile|  0.5336206154783815|

我想执行以下操作:我想为每一个";runid";,对于每个";device_type";,变量==1和变量==0的一些计算,包括重采样循环。最终目标是将这些计算存储在另一个DF中。

因此,在一种天真的方法中,代码看起来是这样的:

for runid in df.select('runid').distinct().rdd.flatMap(list).collect():
for device in ["Mobile","Desktop"]:
a_variant = df.filter((df.runid  == runid) & (df.device_type  == device) & (df.variant  == 0))
b_variant = df.filter((df.runid  == runid) & (df.device_type  == device) & (df.variant  == 1))
## do some more calculations here
# bootstrap loop:
for samp in range(100):
sampled_vector_a = a_variant.select("prediction").sample(withReplacement = True, fraction = 1.0, seed = 123)
sampled_vector_b = b_variant.select("prediction").sample(withReplacement = True, fraction = 1.0, seed = 123)
## do some more calculations here
## do some more calculations here
## store calculations in a new DataFrame

目前这个过程太慢了。如何以最佳方式利用火花来优化此过程?谢谢

以下是应用groupBy后从数据帧中的每个组进行采样的方法。

from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName("Demo").getOrCreate()
df = spark.createDataFrame(data,columns)
data = [["uid1","runid1",1,"Mobile",0.8],["uid2","runid1",1,"Mobile",0.3],
["uid3","runid1",0,"Mobile",0.5],["uid4","runid2",0,"Mobile",0.7],
["uid5","runid2",0,"Mobile",0.9]]
columns = ["userid","runid","variant","device_type","prediction"]
df.show()
# +------+------+-------+-----------+----------+
# |userid| runid|variant|device_type|prediction|
# +------+------+-------+-----------+----------+
# |  uid1|runid1|      1|     Mobile|       0.8|
# |  uid2|runid1|      1|     Mobile|       0.3|
# |  uid3|runid1|      0|     Mobile|       0.5|
# |  uid4|runid2|      0|     Mobile|       0.7|
# |  uid5|runid2|      0|     Mobile|       0.9|
# +------+------+-------+-----------+----------+

定义将由applyInPandas调用的采样函数。函数my_sample为每个输入数据帧提取一个样本:

def my_sample(key, df):
x = df.sample(n=1)
return x

applyInPandas的输出也需要一个模式,因为它返回的是整个数据帧,所以它将具有与df:相同的字段

from pyspark.sql.types import *
schema = StructType([StructField('userid', StringType()),
StructField('runid', StringType()),
StructField('variant', LongType()),
StructField('device_type', StringType()),
StructField('prediction', DoubleType())])

只是为了检查,尝试对数据进行分组,有三组:

df.groupby("runid", "device_type", "variant").mean("prediction").show()
# +------+-----------+-------+---------------+
# | runid|device_type|variant|avg(prediction)|
# +------+-----------+-------+---------------+
# |runid1|     Mobile|      0|            0.5|
# |runid1|     Mobile|      1|           0.55|
# |runid2|     Mobile|      0|            0.8|
# +------+-----------+-------+---------------+

现在使用applyInPandas:将my_sample应用于每组

df.groupby("runid","device_type","variant").applyInPandas(my_sample, schema=schema).show()
# +------+------+-------+-----------+----------+
# |userid| runid|variant|device_type|prediction|
# +------+------+-------+-----------+----------+
# |  uid3|runid1|      0|     Mobile|       0.5|
# |  uid2|runid1|      1|     Mobile|       0.3|
# |  uid4|runid2|      0|     Mobile|       0.7|
# +------+------+-------+-----------+----------+

注意:我使用了applyInPandas,因为pyspark.sql.GroupedData.apply.html已弃用

最新更新