方案:
我在芹菜上创建了一个用于测试目的的共享_task [兔子作为排队消息的经纪人]:
@app.task(bind=True, max_retries = 5, base=MyTask)
def testing(self):
try:
raise smtplib.SMTPException
except smtplib.SMTPException as exc:
print 'This is it'
self.retry(exc=exc, countdown=2)
#Overriding base class of Task
class MyTask(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
print "MyTask on failure world"
pass
我通过输入命令 testing.delay()在创建工人后10次调用了用于测试的任务。我只是通过按CTRL C删除服务器,然后从RabbitMQ服务器中删除所有这些队列。我再次启动了服务器。
服务器启动命令: celery worker --app=my_app.settings -l DEBUG
delete''queue命令: rabbitmqadmin delete queue name=<queue_name>
删除工人命令: ps auxww | grep 'celery worker' | awk '{print $2}' | xargs kill -9
问题:
由于我已经从RabbitMQ服务器中删除了所有队列,因此现在只能收到新的任务。但是我仍在完成旧的任务,此外,列表中没有出现新任务。这是什么原因是什么?
正在发生的事情是您的工人承担多个任务,除非您在启动Worker时具有-Ofair
标志
https://medium.com/@taylorhughes/three-quick-tips-from-two-two-with-celery-celery-c05ff9d7f9eb
因此,即使您清除了队列,除非您杀死工作过程本身,否则您的工人仍然会运行它的任务。
编辑添加
如果重新启动后运行任务,则需要撤销该任务。
http://celery.readthedocs.io/en/latest/faq.html#can-i-cancel-cancel-the-execution-of-a-task