我有一个火花DataFrame
,其一列具有> 的和很少的零(只有0.01%的零)。
我想进行一个随机的子样本,但是一个分层的子样本 - 使其在该列中保持1s至0s的比率。
可以在Pyspark中做吗?
我正在寻找 non -scala 解决方案,并基于DataFrame
s,而不是RDD
-BASED。
我在Spark中的分层采样中建议的解决方案 从 scala 转换为 Python (甚至 Java - 是什么是分层火花数据集的最简单方法?)。
尽管如此,我将重写 python 。让我们首先创建玩具DataFrame
:
from pyspark.sql.functions import lit
list = [(2147481832,23355149,1),(2147481832,973010692,1),(2147481832,2134870842,1),(2147481832,541023347,1),(2147481832,1682206630,1),(2147481832,1138211459,1),(2147481832,852202566,1),(2147481832,201375938,1),(2147481832,486538879,1),(2147481832,919187908,1),(214748183,919187908,1),(214748183,91187908,1)]
df = spark.createDataFrame(list, ["x1","x2","x3"])
df.show()
# +----------+----------+---+
# | x1| x2| x3|
# +----------+----------+---+
# |2147481832| 23355149| 1|
# |2147481832| 973010692| 1|
# |2147481832|2134870842| 1|
# |2147481832| 541023347| 1|
# |2147481832|1682206630| 1|
# |2147481832|1138211459| 1|
# |2147481832| 852202566| 1|
# |2147481832| 201375938| 1|
# |2147481832| 486538879| 1|
# |2147481832| 919187908| 1|
# | 214748183| 919187908| 1|
# | 214748183| 91187908| 1|
# +----------+----------+---+
您可以看到此DataFrame
有12个元素:
df.count()
# 12
以下分布式分布:
df.groupBy("x1").count().show()
# +----------+-----+
# | x1|count|
# +----------+-----+
# |2147481832| 10|
# | 214748183| 2|
# +----------+-----+
现在让我们采样:
首先我们将种子设置:
seed = 12
查找分数的密钥和样品:
fractions = df.select("x1").distinct().withColumn("fraction", lit(0.8)).rdd.collectAsMap()
print(fractions)
# {2147481832: 0.8, 214748183: 0.8}
sampled_df = df.stat.sampleBy("x1", fractions, seed)
sampled_df.show()
# +----------+---------+---+
# | x1| x2| x3|
# +----------+---------+---+
# |2147481832| 23355149| 1|
# |2147481832|973010692| 1|
# |2147481832|541023347| 1|
# |2147481832|852202566| 1|
# |2147481832|201375938| 1|
# |2147481832|486538879| 1|
# |2147481832|919187908| 1|
# | 214748183|919187908| 1|
# | 214748183| 91187908| 1|
# +----------+---------+---+
我们现在可以检查样本的内容:
sampled_df.count()
# 9
sampled_df.groupBy("x1").count().show()
# +----------+-----+
# | x1|count|
# +----------+-----+
# |2147481832| 7|
# | 214748183| 2|
# +----------+-----+
假设您在'数据框架中具有泰坦尼克号数据集,您想使用基于"幸存的"目标变量的分层采样将其分为火车和测试集。
# Check initial distributions of 0's and 1's
-> data.groupBy("Survived").count().show()
Survived|count|
+--------+-----+
| 1| 342|
| 0| 549
# Taking 70% of both 0's and 1's into training set
-> train = data.sampleBy("Survived", fractions={0: 0.7, 1: 0.7}, seed=10)
# Subtracting 'train' from original 'data' to get test set
-> test = data.subtract(train)
# Checking distributions of 0's and 1's in train and test sets after the sampling
-> train.groupBy("Survived").count().show()
+--------+-----+
|Survived|count|
+--------+-----+
| 1| 239|
| 0| 399|
+--------+-----+
-> test.groupBy("Survived").count().show()
+--------+-----+
|Survived|count|
+--------+-----+
| 1| 103|
| 0| 150|
+--------+-----+
在pyspark中使用'RandomSplit'和'Union'可以很容易地完成此操作。
# read in data
df = spark.read.csv(file, header=True)
# split dataframes between 0s and 1s
zeros = df.filter(df["Target"]==0)
ones = df.filter(df["Target"]==1)
# split datasets into training and testing
train0, test0 = zeros.randomSplit([0.8,0.2], seed=1234)
train1, test1 = ones.randomSplit([0.8,0.2], seed=1234)
# stack datasets back together
train = train0.union(train1)
test = test0.union(test1)
这是基于 @eliasah的接受答案,so so thread
如果要恢复火车和测试集,则可以使用以下功能:
from pyspark.sql import functions as F
def stratified_split_train_test(df, frac, label, join_on, seed=42):
""" stratfied split of a dataframe in train and test set.
inspiration gotten from:
https://stackoverflow.com/a/47672336/1771155
https://stackoverflow.com/a/39889263/1771155"""
fractions = df.select(label).distinct().withColumn("fraction", F.lit(frac)).rdd.collectAsMap()
df_frac = df.stat.sampleBy(label, fractions, seed)
df_remaining = df.join(df_frac, on=join_on, how="left_anti")
return df_frac, df_remaining
创建一个分层的火车和测试集,其中80%用于培训集
df_train, df_test = stratified_split_train_test(df=df, frac=0.8, label="y", join_on="unique_id")
您可以使用以下功能。我用其他答案结合了。
import pyspark.sql.functions as f
from pyspark.sql import DataFrame as SparkDataFrame
def train_test_split_pyspark(
df: SparkDataFrame,
startify_column: str,
unique_col: str = None,
train_fraction: float = 0.05,
validation_fraction: float = 0.005,
test_fraction: float = 0.005,
seed: int = 1234,
to_pandas: bool = True,
):
if not unique_col:
unique_col = "any_unique_name_here"
df = df.withColumn(unique_col, f.monotonically_increasing_id())
# Train data
train_fraction_dict = (
df.select(startify_column)
.distinct()
.withColumn("fraction", f.lit(train_fraction))
.rdd.collectAsMap()
)
df_train = df.stat.sampleBy(startify_column, train_fraction_dict, seed)
df_remaining = df.join(df_train, on=unique_col, how="left_anti")
# Validation data
validation_fraction_dict = {
key: validation_fraction for (_, key) in enumerate(train_fraction_dict)
}
df_val = df_remaining.stat.sampleBy(startify_column, validation_fraction_dict, seed)
df_remaining = df_remaining.join(df_val, on=unique_col, how="left_anti")
# Test data
test_fraction_dict = {
key: test_fraction for (_, key) in enumerate(train_fraction_dict)
}
df_test = df_remaining.stat.sampleBy(startify_column, test_fraction_dict, seed)
if unique_col == "any_unique_name_here":
df_train = df_train.drop(unique_col)
df_val = df_val.drop(unique_col)
df_test = df_test.drop(unique_col)
if to_pandas:
return (df_train.toPandas(), df_val.toPandas(), df_test.toPandas())
return df_train, df_val, df_test
为了避免在火车/测试拆分或消失中发现的行,我将进一步添加到Vincent Claes的解决方案
def stratifiedSampler(sparkDf:DataFrame, ratio:float,
label:str, joinOn:str, seed=42):
fractions = (sparkDf.select(label).distinct()
.withColumn("fraction",f.lit(ratio))
.rdd.collectAsMap())
fracDf = sparkDf.stat.sampleBy(label, fractions, seed)
fracDf = fracDf.localCheckpoint()
remaingDf = sparkDf.join(fracDf, on=joinOn, how="left_anti")
return (fracDf, remaingDf)
from pyspark.sql.functions import lit
list = [(2147481832,23355149,'v0'),(2147481832,973010692,'v3'),
(2147481832,2134870842,'v1'),(2147481832,541023347,'v3'),
(2147481832,1682206630,'v2'),(2147481832,1138211459,'v4'),
(2147481832,852202566,'v2'),(2147481832,201375938,'v5'),
(2147481832,486538879,'v3'),(2147481832,919187908,'v4'),
(214748183,919187908,'v3'),(214748183,91187908,'v4')]
df = spark.createDataFrame(list, ["x1","x2","x3"])
df = df.sampleBy("x3", fractions={'v1': 0.2, 'v2':
0.2, 'v3': 0.2,'v4':0.2,'v5':0.2}, seed=0)