我在一个项目中使用Celery,将其用作调度程序(作为周期性任务)。
我的芹菜任务看起来像:
@periodic_task(run_every=timedelta(seconds=300))
def update_all_feed():
feed_1()
feed_2()
...........
feed_n()
但是,随着订阅源数量的增加,到达其他订阅源需要很长时间(例如,当Celery使用订阅源编号n时,到达下一个订阅源需要较长时间(n+1)。我想使用Celery的并发性来启动多个提要。
在浏览了文档后,我发现我可以调用芹菜任务,如下所示:
feed.delay()
我如何配置芹菜,使其获得所有的提要ID并聚合它们(例如,一次5个提要)?我意识到要实现这一点,我必须运行Celery作为守护进程。
注意:我使用mongodb作为代理,我所做的只是安装它并在Celery的配置中添加url。
您可以像这个一样安排所有提要
@periodic_task(run_every=timedelta(seconds=300))
def update_all_feed():
feed_1.delay()
feed_2.delay()
.......
feed_n.delay()
或者你可以使用一个组来简化它
from celery import group
@periodic_task(run_every=timedelta(seconds=300))
def update_all_feed():
group(feed.delay(i) for i in range(10))
现在要运行任务,您可以启动一个工人来执行任务
celery worker -A your_app -l info --beat
这开始每五分钟执行一次任务。然而,默认的并发性等于cpu的核心。您也可以更改并发性。如果你想同时执行10个任务,那么
celery worker -A your_app -l info --beat -c 10
来自Celery文档:
from celery.task.sets import TaskSet
from .tasks import feed, get_feed_ids
job = TaskSet(tasks=[
feed.subtask((feed_id,)) for feed_id in get_feed_ids()
])
result = job.apply_async()
results = result.join() # There's more in the documentation