如何在Luigi for AWS Batch中正确覆盖decorator函数



我正在使用Luigi启动AWS批处理作业。我想创建luigi.contrib.batch.BatchTask的子类(Luigi for AWS Batch的文档(依赖于boto3(可以在这里找到:https://luigi.readthedocs.io/en/stable/_modules/luigi/contrib/batch.html#BatchTask)

类BatchTask:

class BatchTask(luigi.Task):
"""
Base class for an Amazon Batch job
Amazon Batch requires you to register "job definitions", which are JSON
descriptions for how to issue the ``docker run`` command. This Luigi Task
requires a pre-registered Batch jobDefinition name passed as a Parameter
:param job_definition (str): name of pre-registered jobDefinition
:param job_name: name of specific job, for tracking in the queue and logs.
:param job_queue: name of job queue where job is going to be submitted.
"""
job_definition = luigi.Parameter()
job_name = luigi.OptionalParameter(default=None)
job_queue = luigi.OptionalParameter(default=None)
poll_time = luigi.IntParameter(default=POLL_TIME)
def run(self):
bc = BatchClient(self.poll_time)
job_id = bc.submit_job(
self.job_definition,
self.parameters,
job_name=self.job_name,
queue=self.job_queue)
bc.wait_on_job(job_id)

@property
def parameters(self):
"""Override to return a dict of parameters for the Batch Task"""
return {}

对于我的子类,我想覆盖parameters函数,这样我就可以使用代码传递我自己的参数,而不是依赖于AWS Web界面手动填充参数。这个parameters函数在函数上方有一个@property标记,这表明它是一个装饰器——这是我不太熟悉的东西。

我认为我的问题是不可知的,尽管是:

如何重写parameters函数,使其不返回空字典,而是返回我定义的字典?

书面子类:

import luigi.contrib.batch as batch
batch_job_revision_number=1
class SubclassLuigiBatchTask(batch.BatchTask):
job_definition='arn:aws:batch:{0}:job-definition/{1}:{2}'.format(
'aws-credentials-that-i-cannot-share',
'aws_pre-registered-job-description-name',
batch_job_revision_number)
job_name='my_example_job'
job_queue='my_example_queue'
poll_time = 10
task = batch.BatchTask(
job_definition='arn:aws:batch:{0}:job-definition/{1}:{2}'.format(
'aws-credentials-that-i-cannot-share',
'aws_pre-registered-job_description',
batch_job_revision_number),
job_name='my_example_job',
job_queue='my_example_queue',
poll_time=10
)
@task.parameters
def parameters(self):
return {
"job_definition": "df -h"
}
# This run function runs the predefined job (job definition)
def run(self):
self.task.run()
# function only runs if output doesn't exist yet
def output(self):
return LocalTarget('/home/some_user/task_completed.txt')

上面的例子返回错误:

TypeError: 'dict' object is not callable

如何访问setter函数来覆盖此默认return {}?我试着在命令行上查看哪些函数可用于task.parameters,查看哪些方法可用:

>>> dir(task.parameters)
['__class__', '__contains__', '__delattr__', '__delitem__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getitem__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__iter__', '__le__', '__len__', '__lt__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__setitem__', '__sizeof__', '__str__', '__subclasshook__', 'clear', 'copy', 'fromkeys', 'get', 'items', 'keys', 'pop', 'popitem', 'setdefault', 'update', 'values']

而不是

@task.parameters
def parameters(self):
...

你需要做:

@property
def parameters(self):
...

用decorator重写属性还不错。看看下面的例子:

class A:
def __init__(self):
self._a = "3"
@property
def a(self):
return self._a
class B(A):
def __init__(self):
super(B, self).__init__()
self._b = "4"
@property
def a(self):
return self._b
a = A()  
b = B()
print('a', a.a)
print('b', b.a)

这将打印出来:

a 3
b 4

我认为添加那个奇怪的装饰器@task.parameters可能会把它搞砸。可能真的在试图重现自我?非常奇怪的东西。

最新更新