我有一个具有以下模式的DataFrame:
+--------------------+--------+-------+-------+-----------+--------------------+
| userid|datadate| runid|variant|device_type| prediction|
+--------------------+--------+-------+-------+-----------+--------------------+
|0001d15b-e2da-4f4...|20220111|1196752| 1| Mobile| 0.8827571312010658|
|00021723-2a0d-497...|20220111|1196752| 1| Mobile| 0.30763173370229735|
|00021723-2a0d-497...|20220111|1196752| 0| Mobile| 0.5336206154783815|
我想执行以下操作:我想为每一个";runid";,对于每个";device_type";,变量==1和变量==0的一些计算,包括重采样循环。最终目标是将这些计算存储在另一个DF中。
因此,在一种天真的方法中,代码看起来是这样的:
for runid in df.select('runid').distinct().rdd.flatMap(list).collect():
for device in ["Mobile","Desktop"]:
a_variant = df.filter((df.runid == runid) & (df.device_type == device) & (df.variant == 0))
b_variant = df.filter((df.runid == runid) & (df.device_type == device) & (df.variant == 1))
## do some more calculations here
# bootstrap loop:
for samp in range(100):
sampled_vector_a = a_variant.select("prediction").sample(withReplacement = True, fraction = 1.0, seed = 123)
sampled_vector_b = b_variant.select("prediction").sample(withReplacement = True, fraction = 1.0, seed = 123)
## do some more calculations here
## do some more calculations here
## store calculations in a new DataFrame
目前这个过程太慢了。如何以最佳方式利用火花来优化此过程?谢谢
以下是应用groupBy
后从数据帧中的每个组进行采样的方法。
from pyspark.sql import SparkSession
import pandas as pd
spark = SparkSession.builder.appName("Demo").getOrCreate()
df = spark.createDataFrame(data,columns)
data = [["uid1","runid1",1,"Mobile",0.8],["uid2","runid1",1,"Mobile",0.3],
["uid3","runid1",0,"Mobile",0.5],["uid4","runid2",0,"Mobile",0.7],
["uid5","runid2",0,"Mobile",0.9]]
columns = ["userid","runid","variant","device_type","prediction"]
df.show()
# +------+------+-------+-----------+----------+
# |userid| runid|variant|device_type|prediction|
# +------+------+-------+-----------+----------+
# | uid1|runid1| 1| Mobile| 0.8|
# | uid2|runid1| 1| Mobile| 0.3|
# | uid3|runid1| 0| Mobile| 0.5|
# | uid4|runid2| 0| Mobile| 0.7|
# | uid5|runid2| 0| Mobile| 0.9|
# +------+------+-------+-----------+----------+
定义将由applyInPandas
调用的采样函数。函数my_sample
为每个输入数据帧提取一个样本:
def my_sample(key, df):
x = df.sample(n=1)
return x
applyInPandas
的输出也需要一个模式,因为它返回的是整个数据帧,所以它将具有与df
:相同的字段
from pyspark.sql.types import *
schema = StructType([StructField('userid', StringType()),
StructField('runid', StringType()),
StructField('variant', LongType()),
StructField('device_type', StringType()),
StructField('prediction', DoubleType())])
只是为了检查,尝试对数据进行分组,有三组:
df.groupby("runid", "device_type", "variant").mean("prediction").show()
# +------+-----------+-------+---------------+
# | runid|device_type|variant|avg(prediction)|
# +------+-----------+-------+---------------+
# |runid1| Mobile| 0| 0.5|
# |runid1| Mobile| 1| 0.55|
# |runid2| Mobile| 0| 0.8|
# +------+-----------+-------+---------------+
现在使用applyInPandas
:将my_sample
应用于每组
df.groupby("runid","device_type","variant").applyInPandas(my_sample, schema=schema).show()
# +------+------+-------+-----------+----------+
# |userid| runid|variant|device_type|prediction|
# +------+------+-------+-----------+----------+
# | uid3|runid1| 0| Mobile| 0.5|
# | uid2|runid1| 1| Mobile| 0.3|
# | uid4|runid2| 0| Mobile| 0.7|
# +------+------+-------+-----------+----------+
注意:我使用了applyInPandas
,因为pyspark.sql.GroupedData.apply.html
已弃用