可靠地发送有关Celery任务成功/失败的邮件



我目前正在使用Flask和Celery为生物信息学web服务开发模板。该模板显示了用户如何创建任务并在Celery工作程序上运行这些任务。

此类服务中的一个常见要求是在任务成功或失败时通知用户。我用Flask Mail发邮件。我的第一次尝试是:

@celery.task(name='app.expensive_greet', bind=True)
def expensive_greet(self, person, total, email):
this_task_id = expensive_greet.request.id
try:
for i in range(total):
time.sleep(0.1)
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total})
if email:
send_success_mail(email, this_task_id)
return 'Greetings, {}!'.format(person)
except SoftTimeLimitExceeded:
if email:
send_failure_mail(email, this_task_id)
return 'Greetings, fallback!'
except Exception:
if email:
send_failure_mail(email, this_task_id)

正如您所看到的,有很多重复的代码。我想知道是否有可能在自定义的CeleryTask中隔离邮件处理,结果是:

class MailBase(celery.Task):
abstract = True
def on_success(self, res, task_id, args, kwargs):
_, _, email = args
if email:
send_success_mail(email, task_id)
def on_failure(self, exc, task_id, args, kwargs, einf):
_, _, email = args
if email:
send_failure_mail(email, task_id)

以及为该任务设置CCD_ 2。我不喜欢这个解决方案,因为:

  1. 它仍然有重复的if,但我可以忍受
  2. 如果用户更改任务函数的参数,他们还必须在两个不同的地方修改MailBase
  3. 函数参数的解包很难看
  4. 由于这是一个主要面向非技术用户的模板,因此简单性很重要

有更好的方法吗?理想情况下,我希望邮件是与任务一起发送的元数据,而不是任务函数的参数,并且能够在不接触任务本身的情况下插入邮件功能。

提前感谢!

如果您只想在任务失败时得到通知,您可以使用芹菜内置的电子邮件机制。

此外,如果你仍然想坚持自己的方式,你可以尝试使用decorator来包装与电子邮件相关的操作。

import functools

def send_emails(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
this_task_id = self.request.id
email = kwargs.pop('email', False)  # get email and remove it from kwargs
try:
ret = func(self, *args, **kwargs)
except NotifyException as ex:
if email:
send_failure_mail(email, this_task_id)
return ex.value
except Exception:
if email:
send_failure_mail(email, this_task_id)
# It would be better to raise again to allow celery knows the task has failed
raise
else:
if email:
send_success_mail(mail, this_task_id)
return ret
return wrapper

NotifyException是针对发生错误的情况而引入的,但用户并不将其视为失败,只是想发送电子邮件。

class NotifyException(Exception):
'''
This exception would be handled by send_emails decorator, the decorator will
catch it and return its value to outer.
'''
def __init__(self, value):
self.value = value
super(NotifyException, self).__init__(value)

请注意,最好在配置文件中保留email参数。

现在任务方法将像这个一样改变

@celery.task(name='app.expensive_greet', bind=True)
@send_emails
def expensive_greet(self, person, total):
try:
for i in range(total):
time.sleep(0.1)
self.update_state(state='PROGRESS',
meta={'current': i, 'total': total})
return 'Greetings, {}!'.format(person)
except SoftTimeLimitExceeded:
# Note that raise this exception to allow the decorator catch it
# and return the value of the exception
raise NotifyException('Greetings, fallback!')

希望它能有所帮助!

相关内容

  • 没有找到相关文章

最新更新