我有一个函数可以进行一些数据处理,例如:
from typing import Optional
from pyspark.sql import DataFrame
def standard_date_formatter(
df: DataFrame,
prev_name: str,
prev_fmt: str,
next_name: Optional[str] = None,
next_fmt: str = "yyyy-MM-dd"
):
# data transformation implementation here
pass
我有另一个验证参数的函数,我想将其作为包装器,以便将其与我编写的其他数据转换函数一起使用。但是,我不得不重复包装器中的参数。现在我有
from functools import wraps
import pyspark.sql.functions as f
from pyspark.sql.types import DateType, StringType
def validate_old_new(fn):
@wraps(fn)
def wrapper(
df: DataFrame,
prev_name: str,
prev_format: str,
next_name: Optional[str] = None,
next_format: str = "yyyy-MM-dd",
):
try:
prev_type = df.schema[prev_name].dataType
except KeyError:
raise ValueError("Column to be converted does not exist.")
if prev_type not in (StringType(), DateType()):
raise AttributeError(
"Column to be converted must be StringType or DateType"
)
if not next_name:
next_name = prev_name
df_to_format = (
df
if prev_type == StringType()
else df.withColumn(prev_name, f.col(prev_name).cast(StringType()))
)
return fn(df_to_format, prev_name, prev_format, next_name, next_format)
return wrapper
@validate_old_new
def date_formatter(...) # as above
有没有办法避免在包装器中重复参数?如果没有,是否有一种推荐的、蟒蛇式的方法来避免在";辅助函数";(在我的情况下是数据转换函数(?
您可以使用*args
和**kwargs
来避免重复的参数定义,请阅读更多信息。
import functools
def validate_old_new(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
df = args[0]
next_name = kwargs.get('next_name')
# Validation process...
return fn(...)
return wrapper
@validate_old_new
def date_formatter(
df: DataFrame,
prev_name: str,
prev_format: str,
next_name: Optional[str] = None,
next_format: str = "yyyy-MM-dd",
):
pass
# or
@validate_old_new
def date_formatter(*args, **kwargs):
pass