我们可以在 UDF 中使用关键字参数吗?



我的问题是,我们是否可以在 Pyspark 中使用关键字参数和 UDF,就像我在下面所做的那样。 conv 方法有一个关键字参数conv_type默认情况下分配给特定类型的格式化程序,但是我想在某些地方指定不同的格式。由于关键字参数,这在udf中无法通过。在这里使用关键字参数有不同的方法吗?

from datetime import datetime as dt, timedelta as td,date
tpid_date_dict = {'69': '%d/%m/%Y', '62': '%Y/%m/%d', '70201': '%m/%d/%y', '66': '%d.%m.%Y', '11': '%d-%m-%Y', '65': '%Y-%m-%d'}
def date_formatter_based_on_id(column, date_format):
val = dt.strptime(str(column),'%Y-%m-%d').strftime(date_format)
return val
def generic_date_formatter(column, date_format):
val = dt.strptime(str(column),date_format).strftime('%Y-%m-%d')
return val
def conv(column, id, conv_type=date_formatter_based_on_id):
try:
date_format=tpid_date_dict[id]
except KeyError as e:
print("Key value not found!")
val = None
if column:
try:
val = conv_type(column, date_format)
except Exception as err:
val = column
return val
conv_func = functions.udf(conv, StringType())
date_formatted = renamed_cols.withColumn("check_in_std", 
conv_func(functions.col("check_in"), functions.col("id"), 
generic_date_formatter))

所以问题出在最后一个语句(date_formatted = renamed_cols.withColumn("check_in_std"(, conv_func(functions.col("check_in"(, functions.col("id"(, generic_date_formatter(((由于第三个参数generic_date_formatter是关键字参数。

尝试此操作时,我收到以下错误: 属性错误:"函数"对象没有属性"_get_object_id">

不幸的是,您不能将udf与关键字参数一起使用。UserDefinedFunction.__call__仅使用位置参数定义:

def __call__(self, *cols):
judf = self._judf
sc = SparkContext._active_spark_context
return Column(judf.apply(_to_seq(sc, cols, _to_java_column)))

但是您遇到的问题与关键字参数并不真正相关。你会得到异常,因为generic_date_formatter不是一个Column对象,而是一个函数。

您可以动态创建udf

def conv(conv_type=date_formatter_based_on_id):
def _(column, id):
try:
date_format=tpid_date_dict[id]
except KeyError as e:
print("Key value not found!")
val = None
if column:
try:
val = conv_type(column, date_format)
except Exception as err:
val = column
return val
return udf(_, StringType())

可以称为:

conv_func(generic_date_formatter)(functions.col("check_in"), functions.col("id"))

有关详细信息,请查看withColumn下的数据框列和外部列表传递到udf。

最新更新