我想编写一个 any_lambda
函数,该函数检查 ArrayType
列中的任何元素是否符合lambda函数指定的条件。
这是我没有工作的代码:
def any_lambda(f, l):
return any(list(map(f, l)))
spark.udf.register("any_lambda", any_lambda)
source_df = spark.createDataFrame(
[
("jose", [1, 2, 3]),
("li", [4, 5, 6]),
("luisa", [10, 11, 12]),
],
StructType([
StructField("name", StringType(), True),
StructField("nums", ArrayType(StringType(), True), True),
])
)
actual_df = source_df.withColumn(
"any_num_greater_than_5",
any_lambda(lambda n: n > 5, col("nums"))
)
此代码提高TypeError: Column is not iterable
。
如何创建有效的any_lambda
函数?
udf期望参数为列, lambda
函数不是列;您可能要做的是定义any_lambda
,以便采用lambda功能并返回udf
:
import pyspark.sql.functions as F
def any_lambda(f):
@F.udf
def temp_udf(l):
return any(map(f, l))
return temp_udf
source_df = spark.createDataFrame(
[
("jose", [1, 2, 3]),
("li", [4, 5, 6]),
("luisa", [10, 11, 12]),
],
StructType([
StructField("name", StringType(), True),
StructField("nums", ArrayType(IntegerType(), True), True),
])
)
actual_df = source_df.withColumn(
"any_num_greater_than_5",
any_lambda(lambda n: n > 5)(col("nums"))
)
actual_df.show()
+-----+------------+----------------------+
| name| nums|any_num_greater_than_5|
+-----+------------+----------------------+
| jose| [1, 2, 3]| false|
| li| [4, 5, 6]| true|
|luisa|[10, 11, 12]| true|
+-----+------------+----------------------+
或@powers评论,要明确有关返回的列类型,我们可以在udf
中指定返回的类型,例如:
def any_lambda(f):
def temp_udf(l):
return any(map(f, l))
return F.udf(temp_udf, BooleanType())
现在的模式看起来像:
actual_df.printSchema()
root
|-- name: string (nullable = true)
|-- nums: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- any_num_greater_than_5: boolean (nullable = true)