芹菜任务路由未按预期工作



我正在练习芹菜,我想将我的任务分配给特定的队列,但它没有按预期工作

我的__init__.py

import os
import sys
from celery import Celery
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(CURRENT_DIR)
app = Celery()
app.config_from_object('celery_config')

我的celery_config.py

amqp = 'amqp://guest:guest@localhost:5672//'
broker_url = amqp
result_backend = amqp
task_routes = ([
('import_feed', {'queue': 'queue_import_feed'})
])

我的tasks.py

from . import app
@app.task(name='import_feed')
def import_feed():
pass

我如何运行我的辅助角色:

celery -A subscriber1.tasks worker -l info

我的客户__init__.py

import os
import sys
from celery import Celery
CURRENT_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.append(CURRENT_DIR)
app = Celery()
app.config_from_object('celery_config')

我的客户celery_config.py

from kombu.common import Broadcast
amqp = 'amqp://guest:guest@localhost:5672//'
BROKER_URL = amqp
CELERY_RESULT_BACKEND = amqp

然后在我客户的外壳中,我尝试了:

from publisher import app
result = app.send_task('import_feed')

然后我的工人得到了任务?!我期望不应该,因为我将其分配给特定的队列。我在我的客户端中尝试了以下命令,但我的工作人员没有收到任何任务,我希望在第一个任务上收到

result = app.send_task('import_feed', queue='queue_import_feed')

似乎我误解了路由部分的某些内容。但我真正想要的是import_feed只有在发送任务时指定了queue_import_feed队列时才运行任务

您可以更改工作器处理的默认队列。

app.send_task('import_feed')将任务发送到队列celery

app.send_task('import_feed', queue='queue_import_feed')将任务发送到queue_import_feed但您的工作人员只处理队列celery任务。

要处理特定队列,请使用-Q开关

celery -A subscriber1.tasks worker -l info -Q 'queue_import_feed'

编辑

为了对send_task施加限制,以便工作人员仅在使用队列发布任务时import_feed任务做出反应,您需要覆盖Celery上的send_task,并提供default_queue设置为None的自定义AMQP

reactor.py

from celery.app.amqp import AMQP
from celery import Celery
class MyCelery(Celery):
def send_task(self, name=None, args=None, kwargs=None, **options):
if 'queue' in options:
return super(MyCelery, self).send_task(name, args, kwargs, **options)

class MyAMQP(AMQP):
default_queue = None

celery_config.py

from kombu import Exchange, Queue
...
task_exchange = Exchange('default', type='direct')
task_create_missing_queues = False
task_queues = [
Queue('feed_queue', task_exchange, routing_key='feeds'),
]
task_routes = {
'import_feed': {'queue': 'feed_queue', 'routing_key': 'feeds'}
}

__init__.py

celeree = MyCelery(amqp='reactor.MyAMQP')

相关内容

  • 没有找到相关文章

最新更新