如何创建一个python装饰器,其参数是被装饰函数加上任意参数



我以前创建过包装函数的装饰器,但在本例中,我不需要包装,所以我猜我使用了错误的范例,所以也许有人可以帮助我弄清楚并解决我的最终目标。

我想象的是一个装饰器,当调用时(在编译时),它需要3个参数:

  • 修饰函数(驻留在Model类中)
  • 类的数据成员的名称(即数据库字段,例如CharField类型的name字段)
  • 类中父关键数据成员的名称(例如ForeignKey类型的parent)

我的装饰器代码将在一个全局列表变量中注册函数、字段和与之相关的键。

我将有一个类,继承自覆盖save()delete()的模型。它将循环遍历全局列表,使用函数的输出更新相关字段,然后调用父模型的.save()方法,以便它也会更新其装饰字段。

我很快意识到,尽管装饰器没有传递具有装饰器的函数,因为当编译期间没有字段或父键提供给装饰器时,我会得到一个我创建的异常。

如果这不是很清楚,这里是我的代码:

updater_list: Dict[str, List] = {}

def field_updater_function(fn, update_field_name=None, parent_field_name=None):
"""
This is a decorator for functions in a Model class that are identified to be used to update a supplied field and
fields of any linked parent record (if the record is changed).  The function should return a value compatible with
the field type supplied.  These decorators are identified by the MaintainedModel class, whose save and delete
methods override the parent model and call the given functions to update the supplied field.  It also calls linked
dependent models (if supplied) update methods.
"""
if update_field_name is None and parent_field_name is None:
raise Exception(
"Either an update_field_name or parent_field_name argument is required."
)
# Get the name of the class the function belongs to
class_name = fn.__qualname__.split(".")[0]
func_dict = {
"function": fn.__name__,
"update_field": update_field_name,
"parent_field": parent_field_name,
}
if class_name in updater_list:
updater_list[class_name].append(func_dict)
else:
updater_list[class_name] = [func_dict]
if settings.DEBUG:
print(f"Added field_updater_function decorator to function {fn.__qualname__}")
return fn

class MaintainedModel(Model):
"""
This class maintains database field values for a django.models.Model class whose values can be derived using a
function.  If a record changes, the decorated function is used to update the field value.  It can also propagate
changes of records in linked models.  Every function in the derived class decorated with the
`@field_updater_function` decorator (defined above, outside this class) will be called and the associated field
will be updated.  Only methods that take no arguments are supported.  This class overrides the class's save and
delete methods as triggers for the updates.
"""
def save(self, *args, **kwargs):
# Set the changed value triggering this update
super().save(*args, **kwargs)
# Update the fields that change due to the above change (if any)
self.update_decorated_fields()
# Now save the updated values (i.e. save again)
super().save(*args, **kwargs)
# Percolate changes up to the parents (if any)
self.call_parent_updaters()
def delete(self, *args, **kwargs):
# Delete the record triggering this update
super().delete(*args, **kwargs)  # Call the "real" delete() method.
# Percolate changes up to the parents (if any)
self.call_parent_updaters()
def update_decorated_fields(self):
"""
Updates every field identified in each field_updater_function decorator that generates its value
"""
for updater_dict in self.get_my_updaters():
update_fun = getattr(self, updater_dict["function"])
update_fld = updater_dict["update_field"]
if update_fld is not None:
setattr(self, update_fld, update_fun())
def call_parent_updaters(self):
parents = []
for updater_dict in self.get_my_updaters():
parent_fld = getattr(self, updater_dict["parent_field"])
if parent_fld is not None and parent_fld not in parents:
parents.append(parent_fld)
for parent_fld in parents:
parent_instance = getattr(self, parent_fld)
if isinstance(parent_instance, MaintainedModel):
parent_instance.save()
elif isinstance(parent_instance, ManyToManyField) and :
parent_instance.all().save()
else:
raise Exception(
f"Parent class {parent_instance.__class__.__name__} or {self.__class__.__name__} must inherit "
f"from {MaintainedModel.__name__}."
)
@classmethod
def get_my_updaters(cls):
"""
Convenience method to retrieve all the updater functions of the calling model.
"""
if cls.__name__ in updater_list:
return updater_list[cls.__name__]
else:
return []
class Meta:
abstract = True

这是我应用的第一个在编译时触发异常的修饰符:

class Tracer(models.Model, TracerLabeledClass):
id = models.AutoField(primary_key=True)
name = models.CharField(
max_length=256,
unique=True,
help_text="A unique name or lab identifier of the tracer, e.g. 'lysine-C14'.",
)
compound = models.ForeignKey(
to="DataRepo.Compound",
on_delete=models.RESTRICT,
null=False,
related_name="tracer",
)
class Meta:
verbose_name = "tracer"
verbose_name_plural = "tracers"
ordering = ["name"]
def __str__(self):
return str(self._name())
@field_updater_function("name", "infusates")
def _name(self):
# format: `compound - [ labelname,labelname,... ]` (but no spaces)
if self.labels is None or self.labels.count() == 0:
return self.compound.name
return (
self.compound.name
+ "-["
+ ",".join(list(map(lambda l: str(l), self.labels.all())))
+ "]"
)

和我的例外:

...
File ".../tracer.py", line 31, in Tracer
@field_updater_function("name")
File ".../maintained_model.py", line 19, in field_updater_function
raise Exception(
Exception: Either an update_field_name or parent_field_name argument is required.

基本思想是我们在数据库中有一堆字段,这些字段可以完全从数据库中的其他字段派生。我们最初使用cached_properties,但它们实际上没有提供加速,所以我们宁愿将计算值保存在数据库中。

我写了一个缓存机制,它使用。save和。delete重写自动刷新缓存,这工作得很好,但有各种缺点。

我们可以自定义代码覆盖.save(),显式调用函数来更新每个字段,但是我想要一些东西,使维护字段值的开销像对执行更新的函数应用装饰器一样简单,并且只提供它们计算值的字段和到层次结构中其他受影响字段的链接。如:

@field_updater_function("name", "infusates")
def _name(self):
...

除了装饰器,我还应该使用其他东西来完成这个任务吗?我可以使用functools.wraps制作一个虚拟装饰器,它只返回所提供的函数(我认为),但那只是感觉不对。

你需要创建一个装饰器工厂。也就是说,你调用一个带参数的函数,返回一个decorator函数,该decorator函数将被传递给要装饰的函数。

一个典型的方法是使用嵌套函数。在另一个函数中定义的函数可以访问封闭函数名称空间中的变量。我认为你的代码应该是这样的:
def field_updater_function(update_field_name=None, parent_field_name=None): # factory
# docstring omitted for brevity
if update_field_name is None and parent_field_name is None:
raise Exception(
"Either an update_field_name or parent_field_name argument is required."
)
def decorator(fn):                                         # the actual decorator
class_name = fn.__qualname__.split(".")[0]
func_dict = {
"function": fn.__name__,
"update_field": update_field_name,        # you can still access variables
"parent_field": parent_field_name,        # from the enclosing namespace
}
if class_name in updater_list:
updater_list[class_name].append(func_dict)
else:
updater_list[class_name] = [func_dict]
if settings.DEBUG:
print(f"Added field_updater_function decorator to function {fn.__qualname__}")
return fn
return decorator                          # here the factory returns the decorator

最新更新