在pyspark中以分布式方式高效地生成大型DataFrame(无需pyspark.sql.Row)



问题归结为以下几点:我想在pyspark中使用现有的并行输入集合和一个给定一个输入可以生成相对大批量行的函数来生成一个DataFrame。在下面的例子中,我想使用例如1000个执行器生成10^12行的数据帧:

def generate_data(one_integer):
import numpy as np
from pyspark.sql import Row
M = 10000000 # number of values to generate per seed, e.g. 10M
np.random.seed(one_integer)
np_array = np.random.random_sample(M) # generates an array of M random values
row_type = Row("seed", "n", "x")
return [row_type(one_integer, i, float(np_array[i])) for i in range(M)]
N = 100000 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = spark.sparkContext.parallelize(list_of_integers)
row_rdd = list_of_integers_rdd.flatMap(list_of_integers_rdd)
from pyspark.sql.types import StructType, StructField, FloatType, IntegerType
my_schema = StructType([
StructField("seed", IntegerType()),
StructField("n", IntegerType()),
StructField("x", FloatType())])
df = spark.createDataFrame(row_rdd, schema=my_schema)

(我真的不想研究给定种子的随机数的分布——这只是我能够想出的一个例子来说明大型数据帧不是从仓库加载的,而是由代码生成的情况)

上面的代码几乎完全符合我的要求。问题是,它以一种非常低效的方式来实现这一点——以为每一行创建一个pythonRow对象为代价,然后将python Row对象转换为内部Spark柱状表示。

有没有一种方法可以通过让spark知道这些是一批值的列来转换已经以列表示的一批行(例如,上面的np_array中的一个或几个numpy数组)?

例如,我可以编写代码来生成python集合RDD,其中每个元素都是pyarrow。RecordBatch或熊猫。DataFrame,但如果不在此过程中创建pyspark Row对象的RDD,我无法找到将其中任何一个转换为Spark DataFrame的方法。

至少有十几篇文章举例说明了如何使用pyarrow+panda高效地将本地(到驱动程序)panda数据帧转换为Spark数据帧,但这对我来说不是一个选项,因为我需要在执行器上以分布式方式实际生成数据,而不是在驱动程序上生成一个panda数据框架并将其发送给执行器。

UPD我找到了一种避免创建Row对象的方法——使用python元组的RDD。正如预期的那样,它仍然太慢,但仍然比使用Row对象快一点。不过,这并不是我真正想要的(这是一种从python向Spark传递柱状数据的非常有效的方式)。

还测量了在机器上进行某些操作的时间(粗略的方式,测量的时间有很大的变化,但在我看来仍然具有代表性):有问题的数据集是10M行,3列(一列是常数整数,另一列是从0到10M-1的整数范围,第三列是使用np.random.random_sample:生成的浮点值

  • 本地生成pandas数据帧(10M行):~440-450毫秒
  • 本地生成spark.sql.Row对象的python列表(10M行):~12-15s
  • 本地生成代表行的元组的python列表(10M行):~3.4-3.5s

仅使用1个执行器和1个初始种子值生成Spark数据帧:

  • 使用spark.createDataFrame(row_rdd, schema=my_schema):~70-80s
  • 使用spark.createDataFrame(tuple_rdd, schema=my_schema):~40-45s
  • (非分布式创建)使用spark.createDataFrame(pandas_df, schema=my_schema):~0.4-0.5s(没有熊猫df生成本身,这需要大致相同的时间)-spark.sql.execution.arrow.enabled设置为true

本地到驱动程序的pandas数据帧在~1s内转换为Spark数据帧(10M行)的例子让我有理由相信,在执行器中生成的数据帧也可能如此。然而,使用python元组的RDD,我现在能达到的10M行的最快速度是~40s。

因此,问题仍然存在——有没有一种方法可以在pyspark中以分布式方式高效地生成大型Spark数据帧?

听起来瓶颈是从RDD->数据帧的转换,而且手头的功能相当快,Panda DF通过pyarrow转换为spark DF也相当快。这里有两个潜在的解决方案:

  1. 由于并行创建pandas-df很容易,因此不用从执行器返回,而是使用df.to_parquet编写生成的df,即:
def generate_data(seed):
M = 10
np.random.seed(seed)
np_array = np.random.random_sample(M) # generates an array of M random values
df = pd.DataFrame(np_array, columns=["x"])
df["seed"] = seed
df.reset_index().to_parquet(f"s3://bucket/part-{str(seed).zfill(5)}.parquet"

在生成的镶木地板文件中进行Spark读取应该是微不足道的。然后,您的瓶颈变成IO限制,这应该比火花转换元组/行类型更快。

  1. 如果不允许将任何内容保存到文件中,则假设您的spark版本足够新,pandas_udfGROUPED_MAP可能会帮助您。它还使用pyarrow在spark DFs和pandas DFs之间进行转换,因此它应该比使用元组更快,并允许您以分布式方式从UDF创建和返回pandas DF
import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
N = 10
df = spark.createDataFrame(
[(i,) for i in range(N)], ["seed"]
)
def generate_data(seed):
M = 10
np.random.seed(seed)
np_array = np.random.random_sample(M) # generates an array of M random values
df = pd.DataFrame(np_array, columns=["x"])
df["seed"] = seed
return df.reset_index()
@pandas_udf("index long, x double, seed long", PandasUDFType.GROUPED_MAP)
def generate_data_udf(pdf):
output = []
for idx, row in pdf.iterrows():
output.append(generate_data(row["seed"]))
return pd.concat(output)

df.groupby("seed").apply(generate_data_udf).show()

较慢的部分将是groupby,您可能能够加快速度,这取决于您如何将种子批处理到generate_data_udf,即:

@udf(returnType=IntegerType())
def batch_seed(seed):
return seed // 10
df.withColumn("batch_seed", batch_seed(col("seed"))). 
groupBy("batch_seed").apply(generate_data_udf).show()

这里有一个解决方案,它不使用RDD或创建Rows,而只使用数据帧操作:
(代码在scala中,但在python中执行同样的操作应该很简单)

val N = 100000
//for seed return array of index and random_value
def generate_data(i: Int): Array[(Int, Double)] = ???
val generate_data_udf = udf (generate_data _)
spark
.range(N)
.toDF("seed")
.withColumn("arr", generate_data_udf($"seed"))
.select(
$"seed",
explode($"arr") as "exp"
)
.select(
$"seed",
$"exp._1" as "n",
$"exp._2" as "x"
)

以下是在不使用仅基于RDD的Row的情况下解决问题的方法。我认为这可能是最有效的方法,因为它使用map来计算函数输出,使用flatMap来组合这些输出——这两种操作都是在RDD上执行的,所以一切都应该是分布式的。

import numpy as np
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext
def generate_data(one_integer):
M = 2 # number of values to generate per seed, e.g. 10M
np.random.seed(one_integer)
np_array = np.random.random_sample(M) # generates an array of M random values
return [(one_integer, i, float(np_array[i])) for i in range(M)]
N = 30 # number of seeds to try, e.g. 100K
list_of_integers = [i for i in range(N)]
list_of_integers_rdd = sc.parallelize(list_of_integers)
generated_data_rdd = list_of_integers_rdd.map(lambda x: generate_data(x))
solved_rdd = generated_data_rdd.flatMap(lambda list: list)
df = spark.createDataFrame(solved_rdd).toDF("seed", "n", "x")
df.show()

最新更新