如何配置芹菜任务



我在一个项目中使用Celery,将其用作调度程序(作为周期性任务)。

我的芹菜任务看起来像:

@periodic_task(run_every=timedelta(seconds=300))
def update_all_feed():
    feed_1()
    feed_2()
    ...........
    feed_n()

但是,随着订阅源数量的增加,到达其他订阅源需要很长时间(例如,当Celery使用订阅源编号n时,到达下一个订阅源需要较长时间(n+1)。我想使用Celery的并发性来启动多个提要。

在浏览了文档后,我发现我可以调用芹菜任务,如下所示:

feed.delay()

我如何配置芹菜,使其获得所有的提要ID并聚合它们(例如,一次5个提要)?我意识到要实现这一点,我必须运行Celery作为守护进程。

注意:我使用mongodb作为代理,我所做的只是安装它并在Celery的配置中添加url。

您可以像这个一样安排所有提要

@periodic_task(run_every=timedelta(seconds=300))
def update_all_feed():
    feed_1.delay()
    feed_2.delay()
    .......
    feed_n.delay()

或者你可以使用一个组来简化它

from celery import group
@periodic_task(run_every=timedelta(seconds=300))
def update_all_feed():
    group(feed.delay(i) for i in range(10))

现在要运行任务,您可以启动一个工人来执行任务

celery worker -A your_app -l info --beat

这开始每五分钟执行一次任务。然而,默认的并发性等于cpu的核心。您也可以更改并发性。如果你想同时执行10个任务,那么

celery worker -A your_app -l info --beat -c 10

来自Celery文档:

from celery.task.sets import TaskSet
from .tasks import feed, get_feed_ids
job = TaskSet(tasks=[
        feed.subtask((feed_id,)) for feed_id in get_feed_ids()
    ])
result = job.apply_async()
results = result.join() # There's more in the documentation

相关内容

  • 没有找到相关文章

最新更新