编写一个像Python任何功能一样函数的Pyspark UDF



我想编写一个 any_lambda函数,该函数检查 ArrayType列中的任何元素是否符合lambda函数指定的条件。

这是我没有工作的代码:

def any_lambda(f, l):
    return any(list(map(f, l)))
spark.udf.register("any_lambda", any_lambda)
source_df = spark.createDataFrame(
    [
        ("jose", [1, 2, 3]),
        ("li", [4, 5, 6]),
        ("luisa", [10, 11, 12]),
    ],
    StructType([
        StructField("name", StringType(), True),
        StructField("nums", ArrayType(StringType(), True), True),
    ])
)
actual_df = source_df.withColumn(
    "any_num_greater_than_5",
    any_lambda(lambda n: n > 5, col("nums"))
)

此代码提高TypeError: Column is not iterable

如何创建有效的any_lambda函数?

udf期望参数为列, lambda函数不是列;您可能要做的是定义any_lambda,以便采用lambda功能并返回udf

import pyspark.sql.functions as F
def any_lambda(f):
    @F.udf
    def temp_udf(l):
        return any(map(f, l))
    return temp_udf
source_df = spark.createDataFrame(
    [
        ("jose", [1, 2, 3]),
        ("li", [4, 5, 6]),
        ("luisa", [10, 11, 12]),
    ],
    StructType([
        StructField("name", StringType(), True),
        StructField("nums", ArrayType(IntegerType(), True), True),
    ])
)
actual_df = source_df.withColumn(
    "any_num_greater_than_5",
    any_lambda(lambda n: n > 5)(col("nums"))
)
actual_df.show()
+-----+------------+----------------------+
| name|        nums|any_num_greater_than_5|
+-----+------------+----------------------+
| jose|   [1, 2, 3]|                 false|
|   li|   [4, 5, 6]|                  true|
|luisa|[10, 11, 12]|                  true|
+-----+------------+----------------------+

或@powers评论,要明确有关返回的列类型,我们可以在udf中指定返回的类型,例如:

def any_lambda(f):
    def temp_udf(l):
        return any(map(f, l))
    return F.udf(temp_udf, BooleanType())

现在的模式看起来像:

actual_df.printSchema()
root
 |-- name: string (nullable = true)
 |-- nums: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- any_num_greater_than_5: boolean (nullable = true)

相关内容

  • 没有找到相关文章

最新更新