我正在实现一个文件树遍历脚本,它根据文件扩展名或目录名调用任务。
我的行代码是这样的:
from celery import Celery
import os
app = Celery('tasks', broker='amqp://guest@localhost//')
app.conf.update(
CELERY_DEFAULT_EXCHANGE = 'path_walker',
CELERY_DEFAULT_EXCHANGE_TYPE = 'topic',
)
for root, dirs, files in os.walk("/"):
for filename in files:
name, ext = os.path.splitext(filename)
if ext:
app.send_task("process", args=(os.path.join(root, filename),), routing_key="file" + ext)
for dirname in dirs:
app.send_task("process", args=(dirname,), routing_key="directory." + dirname)
您可以看到我调用了相同的任务(process
),但使用不同的routing_key
s。
在我的worker中,我有:
from celery import Celery
from kombu import Queue, Exchange
import uuid
app = Celery(broker='amqp://guest@localhost//')
file_queue = Queue(str(uuid.uuid4()), routing_key="file.py")
dir_queue = Queue(str(uuid.uuid4()), routing_key="directory.tmp")
app.conf.update(
CELERY_DEFAULT_EXCHANGE="path_walker",
CELERY_DEFAULT_EXCHANGE_TYPE="topic",
CELERY_QUEUES=(
dir_queue,
file_queue,
),
)
@app.task(name="process", ignore_result=True, queue=dir_queue)
def process_dir(dir_name):
print("Found a tmp dir: {}".format(dir_name))
@app.task(name="process", ignore_result=True, queue=file_queue)
def process_file(file_name):
print("Found a python file: {}".format(file_name))
上面的代码创建了两个具有不同路由键的队列。然后这两个任务绑定到单独的队列,但是当我运行树漫步器时,只有第二个任务(process_file
函数)被调用。
是否有可能具有相同名称但在不同队列上的任务,由相同的worker运行?或者,如果我想坚持这种方法,我是否需要每个工人只执行一个任务?
回答我自己的问题:
在一个芹菜应用程序中不可能有两个具有相同名称的任务。我可以将上面的两个单独的应用程序分开,并使用不同的worker运行它们,或者为任务提供唯一的名称。
芹菜的关键代码在这里:
https://github.com/celery/celery/blob/8455b0c56797c22ba52abf59f4467ccc19eb9d20/celery/app/base.py