我每15分钟运行一次芹菜调度程序,我需要从API获取数据(速率限制= 300请求= 300请求/min max(并将结果存储到数据库中。我想同时在同一时间限制速率限制的情况下获取URL。如果有任何工人在这里失败,我不想重试,因为我会在15分钟内再次ping。关于如何在芹菜中完成的任何建议。
@celery.task(bind=True)
def fetch_store(self):
start = time()
return c.chain(c.group(emap.s() for _ in range(2000)), ereduce.s(start)).apply_async()
@celery.task(rate_limit='300/m')
def fetch():
#... requests data from external API
return data
@celery.task
def store(numbers, start):
end = time()
logger.info("Received" + numbers + " " + (end - start)/1000 + "seconds")
我通常定义一个自定义Task
子类,并将max_retries
设置为0
(不是None
,这使其永远重试(:
class NoRetryTask(Task):
max_retries = 0
...
您也可以以这样的装饰方式在一行中进行:
@app.task(max_retries=0)
def my_func():
...
有关更多信息,请参见文档。