使用send_task时,如何让Celery将任务发送给合适的工作人员?
例如,给定以下服务:
service_add.py
from celery import Celery
celery = Celery('service_add', backend='redis://localhost', broker='pyamqp://')
@celery.task
def add(x, y):
return x + y
service_sub.py
from celery import Celery
celery = Celery('service_sub', backend='redis://localhost', broker='pyamqp://') #redis backend,rabbitmq for messaging
@celery.task
def sub(x, y):
return x - y
以下代码保证会失败:
主.py
from celery.execute import send_task
result1 = send_task('service_sub.sub',(1,1)).get()
result2 = send_task('service_sub.sub',(1,1)).get()
出现异常celery.exceptions.NotRegistered:"service_sub.sub",因为celery以循环方式向每个进程发送任务,即使service_sub只属于一个进程。
为了完成这个问题,以下是我如何运行进程和配置文件:
celery -A service_sub worker --loglevel=INFO --pool=solo -n worker1
celery -A service_add worker --loglevel=INFO --pool=solo -n worker2
通过快速配置
## Broker settings.
broker_url = 'pyamqp://'
# List of modules to import when the Celery worker starts.
imports = ('service_add.py','service_sub.py')
如果您使用两个不同的应用程序service_add
/service_sub
只是为了将任务路由到专门的工作人员,我想建议另一种解决方案。如果不是这样,并且您仍然需要两个(或多个(应用程序,我建议您更好地封装像amqp://localhost:5672/add_vhost
&后端:redis://localhost/1
。在rabbitMQ中使用专用vhost
,在Redis中使用专用数据库id(1(就可以了。
话虽如此,我认为在这种情况下,正确的解决方案是使用相同的芹菜应用程序(不拆分为两个应用程序(并使用路由器:
task_routes = {'tasks.service_add': {'queue': 'add'}, 'tasks.service_sub': {'queue': 'sub'}}
将其添加到配置中:
app.conf.task_routes = task_routes
并使用Q
(从中读取消息的队列(运行您的工作人员:
celery -A shared_app worker --loglevel=INFO --pool=solo -n worker1 -Q add
celery -A shared_app worker --loglevel=INFO --pool=solo -n worker2 -Q sub
请注意,这种方法有更多的好处,比如如果您希望在任务之间有一些依赖关系(画布(。
有更多的方法可以定义路由器,你可以在这里阅读更多。