使用UDF的行操作


from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, StructType, StructField, IntegerType
# Define the schema for the input DataFrame
input_schema = StructType([
StructField("col1", IntegerType(), True),
StructField("col2", IntegerType(), True)
])
# Define the UDF that accepts an entire row as input and performs operations using columns
@udf(returnType=StringType())
def my_udf(row):
col1 = row.col1
col2 = row.col2
result = col1 + col2
return str(result)
# Create a sample DataFrame
data = [(1, 2), (3, 4), (5, 6)]
df = spark.createDataFrame(data, schema=input_schema)
# Apply the UDF to the DataFrame
result_df = df.withColumn("result", my_udf(df))

我试图运行上面的代码,但我看到下面的错误:TypeError: Invalid argument, not a string or column: DataFrame[col1: int, col2: int] of type . For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

spark版本:3.3.1in databbricks

谁能告诉我我做错了什么?我已经尝试了多种排列和组合,但我不能让它工作。

可以传递所有列的结构体:

from pyspark.sql.functions import struct
result_df = df.withColumn("result", my_udf(struct([df[col] for col in df.columns])))
+----+----+------+
|col1|col2|result|
+----+----+------+
|   1|   2|     3|
|   3|   4|     7|
|   5|   6|    11|
+----+----+------+

最新更新