我做了某种迷你框架,以便在rabbitmq
关闭的情况下能够捕获celery
的连接错误,它以更优雅的方式处理错误,并且除了使用send_task
外,它工作得很好。
这里有一些代码来阐明这个想法:
class MyBaseTask(base_task.Task):
""" Base Class to handle tasks from Me hohoho!"""
abstract = True
@classmethod
def delay(cls, *args, **kwargs):
"""Hook to catch connection errors"""
try:
return super(MyBaseTask, cls).apply_async(args, kwargs)
except socket.error as e:
cls._safe_failover() # a function to handle this error
cls.get_logger().error(str(e))
except Exception as e:
cls.get_logger().error("Uknown Error: %s" % str(e))
raise # normal exception
现在我MyBaseTask
类的子类:
class MyL33tTask(MyBaseTask):
name = 'task.my_leet_task'
def run(self, *args, **kwargs):
# yada yada
它将在发生套接字错误时执行safe_failover函数(又名,当rabbitmq
关闭时)。可悲的是,当我使用send_task('task.my_leet_task')
时不会发生这种情况,因为它使用某种未加载MyBaseTask
代理。
有没有一种简单的方法来覆盖send_task
以改用MyBaseTask
?
到目前为止,我了解您的逻辑的主要用途是确保您实际上正在向代理发送任务。
如果我的理解是正确的,那么你的方法可能是错误的,让我解释一下原因。 使用消息传递时,主要优点是您可以通过将消息传递给代理来调度任务,该方法实际上send_task他对任务本身一无所知,它只是为Celery编写一条格式良好的消息并将其发送到配置的代理(http://docs.celeryproject.org/en/latest/faq.html#can-i-call-a-task-by-name)。
考虑到这一点,很明显,"发送消息失败"的异常捕获应该在您调用 *send_message* 的地方处理。 延迟方法可以保持这种方式,但我建议更明确并保持实际调用 delay() 的捕获逻辑,因为如果任务未调度该怎么办不取决于任务,而取决于调度程序。