在UDF中出现异常时修改不同的Pyspark列



我有一个数据帧和一个函数,我想在数据帧中的每个单元格上运行:

def foo(x):
# does stuff to x
return x
foo_udf = udf(lambda x: foo(x), StringType())
df = df.withColumn("col1", foo_udf(col("col1")))
.withColumn("col2", foo_udf(col("col2")))
.withColumn("col3", foo_udf(col("col3")))

它只是修改传入的数据,并返回一个新值来替换传入的值。

然而,在某些情况下可能会发生错误,对于这些情况,我有另一列col4,它将存储该行的udf是否失败的布尔值。

我的问题是,当这种情况发生时,我无法访问给定行的col4

您可以使用mapPartition在分区级别上执行此操作。我将使用Fugue,它将提供一个更容易的界面来将此应用于Spark。

首先进行一些设置:

from typing import List, Dict, Any, Iterable
import pandas as pd
def foo(x):
if x == "E":
raise ValueError()
return x + "_ran"
def logic(df: List[Dict[str,Any]]) -> List[Dict[str,Any]]:
for row in df:
try:
x = foo(row["col1"])
y = foo(row["col2"])
z = foo(row["col3"])
# if it reaches here, we can update all
row["col1"] = x
row["col2"] = y
row["col3"] = z
row["col4"] = False
except:
row["col4"] = True
return df

foo()是您的原始函数,logic()是一个包装器,仅当每次foo()调用成功时才更新列。对函数进行注释将指导Fugue应用转换。从这里我们可以使用Fugue的transform()对熊猫进行测试。

df = pd.DataFrame({"col1": ["A", "B", "C"], "col2": ["A", "B", "C"], "col3": ["D", "E", "F"]})
from fugue import transform
transform(df, logic, schema="*, col4:boolean")

该模式是Spark操作所必需的。这只是一个最小的表达式,然后Fugue处理它,然后我们得到一个结果:

col1    col2    col3    col4
A_ran   A_ran   D_ran   False
B       B       E       True
C_ran   C_ran   F_ran   False

所以我们可以把它带到Spark。我们只需要提供一个SparkSession。

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sdf = spark.createDataFrame(df)
transform(sdf, logic, schema="*, col4:boolean", engine=spark).show()

您只能从UDF返回/更改单个列。但是,此列可以是StructType,包含有效负载和错误标志。然后你可以"打开包装";将结构列划分为两个(或多个(正常列。

from pyspark.sql import functions as F
from pyspark.sql import types as T
#some testdata
data = [['A', 4],
['B', 2],
['C', 5]]
df=spark.createDataFrame(data, ["id", "col1"])
#the udf
def foo(x):
if x == 5:
error=True
else:
error=False
return [x, error]
foo_udf = F.udf(lambda x: foo(x), returnType = T.StructType([
T.StructField("x", T.StringType(), False),
T.StructField("error", T.BooleanType(), False)
]))
#calling the udf and unpacking the return values
df.withColumn("col1", foo_udf("col1")) 
.withColumn("error", F.col("col1.error")) 
.withColumn("col1", F.col("col1.x")) 
.show()

输出:

+---+----+-----+
| id|col1|error|
+---+----+-----+
|  A|   4|false|
|  B|   2|false|
|  C|   5| true|
+---+----+-----+

最新更新