将函数应用于Spark DataFrame中的所有单元格



我正在尝试将一些Pandas代码转换为Spark进行扩展。myfunc是一个复杂的API的包装器,它接受一个字符串并返回一个新字符串(这意味着我不能使用矢量化函数)。

def myfunc(ds):
for attribute, value in ds.items():
value = api_function(attribute, value)
ds[attribute] = value
return ds
df = df.apply(myfunc, axis='columns')

myfunc获取一个数据系列,将其分解为单独的单元格,为每个单元格调用API,并构建具有相同列名的新数据系列。这样可以有效地修改DataFrame中的所有单元格。

我是Spark的新手,我想使用pyspark来翻译这个逻辑。我已经将我的熊猫DataFrame转换为Spark:

spark = SparkSession.builder.appName('My app').getOrCreate()
spark_schema = StructType([StructField(c, StringType(), True) for c in df.columns])
spark_df = spark.createDataFrame(df, schema=spark_schema)

这就是我迷路的地方。我需要UDF还是pandas_udf?如何使用myfunc遍历所有单元格并为每个单元格返回一个新字符串?spark_df.foreach()不返回任何内容,也没有map()函数。

如有必要,我可以将myfuncDataSeries->DataSeries修改为string->string

选项1:一次对一列使用UDF

最简单的方法是重写函数,将字符串作为参数(这样它就是string->string)并使用UDF。这里有一个很好的例子。这一操作一次只适用于一列。因此,如果您的DataFrame有合理数量的列,您可以一次将UDF应用于每一列:

from pyspark.sql.functions import col
new_df = df.select(udf(col("col1")), udf(col("col2")), ...)

示例

df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
df.show()
+----+----+
|col1|col2|
+----+----+
|   1|   4|
|   2|   5|
|   3|   6|
+----+----+
def plus1_udf(x):
return x + 1
plus1 = spark.udf.register("plus1", plus1_udf)
new_df = df.select(plus1(col("col1")), plus1(col("col2")))
new_df.show()
+-----------+-----------+
|plus1(col1)|plus1(col2)|
+-----------+-----------+
|          2|          5|
|          3|          6|
|          4|          7|
+-----------+-----------+

选项2:一次映射整个DataFrame

map可用于ScalaDataFrame,但目前PySpark中没有。较低级别的RDD API在PySpark中有一个map函数。因此,如果您有太多的列,无法一次转换一个,您可以对DataFrame中的每个单元格进行如下操作:

def map_fn(row):
return [api_function(x) for (column, x) in row.asDict().items()
column_names = df.columns
new_df = df.rdd.map(map_fn).toDF(df.columns)

示例

df = sc.parallelize([[1, 4], [2,5], [3,6]]).toDF(["col1", "col2"])
def map_fn(row):
return [value + 1 for (_, value) in row.asDict().items()]
columns = df.columns
new_df = df.rdd.map(map_fn).toDF(columns)
new_df.show()
+----+----+
|col1|col2|
+----+----+
|   2|   5|
|   3|   6|
|   4|   7|
+----+----+

上下文

foreach的文档只给出了打印的例子,但我们可以通过查看代码来验证它确实没有返回任何内容。

您可以在本文中阅读有关pandas_udf的内容,但它似乎最适合矢量化函数,正如您所指出的,由于api_function,您无法使用矢量化函数。

解决方案是:

udf_func = udf(func, StringType())
for col_name in spark_df.columns:
spark_df = spark_df.withColumn(col_name, udf_func(lit(col_name), col_name))
return spark_df.toPandas()

有三个关键的见解帮助我弄清楚了这一点:

  1. 如果将withColumn与现有列的名称(col_name)一起使用,Spark将"覆盖"/隐藏原始列。这本质上提供了直接编辑列的外观,就好像它是可变的一样
  2. 通过在原始列之间创建一个循环并重用相同的DataFrame变量spark_df,我使用相同的原理来模拟可变的DataFrame,创建一个逐列转换链,每次"覆盖"一列(根据#1-见下文)
  3. SparkUDFs期望所有参数都是Column类型,这意味着它尝试解析每个参数的列值。由于api_function的第一个参数是一个文字值,该值对于向量中的所有行都是相同的,因此必须使用lit()函数。只需将col_name传递给函数,就会尝试提取该列的列值。据我所知,通过col_name相当于通过col(col_name)

假设有3列"a"、"b"one_answers"c",展开此概念将如下所示:

spark_df = spark_df.withColumn('a', udf_func(lit('a'), 'a')
.withColumn('b', udf_func(lit('b'), 'b')
.withColumn('c', udf_func(lit('c'), 'c')

相关内容

  • 没有找到相关文章

最新更新