具有相同名称的芹菜任务



我正在实现一个文件树遍历脚本,它根据文件扩展名或目录名调用任务。

我的行代码是这样的:

from celery import Celery
import os

app = Celery('tasks', broker='amqp://guest@localhost//')
app.conf.update(
    CELERY_DEFAULT_EXCHANGE = 'path_walker',
    CELERY_DEFAULT_EXCHANGE_TYPE = 'topic',
)   
for root, dirs, files in os.walk("/"):
    for filename in files:
        name, ext = os.path.splitext(filename)
        if ext:
            app.send_task("process", args=(os.path.join(root, filename),), routing_key="file" + ext)
    for dirname in dirs:
        app.send_task("process", args=(dirname,), routing_key="directory." + dirname)

您可以看到我调用了相同的任务(process),但使用不同的routing_key s。

在我的worker中,我有:

from celery import Celery
from kombu import Queue, Exchange
import uuid
app = Celery(broker='amqp://guest@localhost//')
file_queue = Queue(str(uuid.uuid4()), routing_key="file.py")
dir_queue = Queue(str(uuid.uuid4()), routing_key="directory.tmp")
app.conf.update(
    CELERY_DEFAULT_EXCHANGE="path_walker",
    CELERY_DEFAULT_EXCHANGE_TYPE="topic",
    CELERY_QUEUES=(
        dir_queue,
        file_queue,
    ),
)

@app.task(name="process", ignore_result=True, queue=dir_queue)
def process_dir(dir_name):
    print("Found a tmp dir: {}".format(dir_name))

@app.task(name="process", ignore_result=True, queue=file_queue)
def process_file(file_name):
    print("Found a python file: {}".format(file_name))

上面的代码创建了两个具有不同路由键的队列。然后这两个任务绑定到单独的队列,但是当我运行树漫步器时,只有第二个任务(process_file函数)被调用。

是否有可能具有相同名称但在不同队列上的任务,由相同的worker运行?或者,如果我想坚持这种方法,我是否需要每个工人只执行一个任务?

回答我自己的问题:

在一个芹菜应用程序中不可能有两个具有相同名称的任务。我可以将上面的两个单独的应用程序分开,并使用不同的worker运行它们,或者为任务提供唯一的名称。

芹菜的关键代码在这里:

https://github.com/celery/celery/blob/8455b0c56797c22ba52abf59f4467ccc19eb9d20/celery/app/base.py

相关内容

  • 没有找到相关文章

最新更新