通过send_task让合适的工作人员执行任务



使用send_task时,如何让Celery将任务发送给合适的工作人员?

例如,给定以下服务:

service_add.py

from celery import Celery

celery = Celery('service_add', backend='redis://localhost', broker='pyamqp://') 

@celery.task
def add(x, y):
return x + y

service_sub.py

from celery import Celery
celery = Celery('service_sub', backend='redis://localhost', broker='pyamqp://') #redis backend,rabbitmq for messaging
@celery.task
def sub(x, y):
return x - y

以下代码保证会失败:

主.py

from celery.execute import send_task
result1 = send_task('service_sub.sub',(1,1)).get()
result2 = send_task('service_sub.sub',(1,1)).get()

出现异常celery.exceptions.NotRegistered:"service_sub.sub",因为celery以循环方式向每个进程发送任务,即使service_sub只属于一个进程。

为了完成这个问题,以下是我如何运行进程和配置文件:

celery -A  service_sub worker --loglevel=INFO --pool=solo -n worker1
celery -A  service_add worker --loglevel=INFO --pool=solo -n worker2

通过快速配置

## Broker settings.
broker_url = 'pyamqp://'
# List of modules to import when the Celery worker starts.
imports = ('service_add.py','service_sub.py')

如果您使用两个不同的应用程序service_add/service_sub只是为了将任务路由到专门的工作人员,我想建议另一种解决方案。如果不是这样,并且您仍然需要两个(或多个(应用程序,我建议您更好地封装像amqp://localhost:5672/add_vhost&后端:redis://localhost/1。在rabbitMQ中使用专用vhost,在Redis中使用专用数据库id(1(就可以了。

话虽如此,我认为在这种情况下,正确的解决方案是使用相同的芹菜应用程序(不拆分为两个应用程序(并使用路由器:

task_routes = {'tasks.service_add': {'queue': 'add'}, 'tasks.service_sub': {'queue': 'sub'}}

将其添加到配置中:

app.conf.task_routes = task_routes

并使用Q(从中读取消息的队列(运行您的工作人员:

celery -A  shared_app worker --loglevel=INFO --pool=solo -n worker1 -Q add
celery -A  shared_app worker --loglevel=INFO --pool=solo -n worker2 -Q sub

请注意,这种方法有更多的好处,比如如果您希望在任务之间有一些依赖关系(画布(。

有更多的方法可以定义路由器,你可以在这里阅读更多。

相关内容

  • 没有找到相关文章

最新更新