我正在向Celery中的web服务发布数据。有时,由于互联网关闭,数据不会发布到web服务,并且会无限次重试任务,直到发布为止。该任务的重试是不必要的,因为网络已关闭,因此不需要重试。
我想到了一个更好的解决方案,即如果一个任务失败三次(至少重试三次),那么它就会转移到另一个队列。此队列包含所有失败任务的列表。现在,当互联网启动,数据通过网络发布时,即任务已经从正常队列中完成,然后它开始处理任务失败的队列中的任务。这不会浪费CPU内存一次又一次地重试任务。
这是我的代码:-现在,我只是再次尝试任务,但我怀疑这是否是正确的方法。
@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):
try :
client = SoapClient(
location = url,
action = 'http://tempuri.org/IService_1_0/',
namespace = "http://tempuri.org/",
soap_ns='soap', ns = False
)
response= client.UpdateShipment(
Weight = Decimal(data['Weight']),
Length = Decimal(data['Length']),
Height = Decimal(data['Height']),
Width = Decimal(data['Width']) ,
)
except Exception, exc:
raise post_data_to_web_service.retry(exc=exc)
如何同时维护两个队列并尝试从两个队列执行任务。
设置.py
BROKER_URL = 'redis://localhost:6379/0'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
默认情况下,芹菜会将所有任务添加到名为celery
的队列中。所以你可以在这里运行你的任务,当发生异常时,它会重试,一旦达到最大重试次数,你就可以将它们转移到一个新的队列,比如foo
from celery.exceptions import MaxRetriesExceededError
@shared_task(default_retry_delay = 1 * 60, max_retries = 10)
def post_data_to_web_service(data,url):
try:
#do something with given args
except MaxRetriesExceededError:
post_data_to_web_service([data, url], queue='foo')
except Exception, exc:
raise post_data_to_web_service.retry(exc=exc)
当你启动你的工作人员时,这个任务将尝试用给定的数据做一些事情。如果失败,它将在60秒内重试10次。然后,当它遇到MaxRetriesExceededError
时,它将相同的任务发布到新队列foo
。
要使用这些任务,您必须启动一个新的工作
celery worker -l info -A my_app -Q foo
或者,如果你用启动它,你也可以从默认工作者那里消耗这个任务
celery worker -l info -A my_app -Q celery,foo