setup:Celery 4.1,RabbitMQ 3.6.1(作为代理(,Redis(作为后端,在这里无关紧要(。
有两个兔子用户:
- 具有
.* .* .*
权限的admin_user
。 remote_user
具有ack ack ack
的权限。
admin_user
可以触发任务,并被芹菜工人用来处理任务。
remote_user
只能触发一种类型的任务 -ack
并在专用ack
队列中排队,稍后由ack
worker (由admin_user
使用(。
remote_user
通过以下代码发送任务:
from celery import Celery
app = Celery('remote', broker='amqp://remote_user:remote_pass@<machine_ip>:5672/vhost')
app.send_task('ack', args=('a1', 'a2'), queue='ack', route_name='ack')
这在 Celery 3.1 中完美运行。升级到 Celery 4.1 后,它不再发送任务。调用返回一个AsyncResult
但我在芹菜花(或通过兔子管理 ui(或日志中看不到该消息。
- 尝试像
admin_user
一样将权限设置为remote_user
.* .* .*
- 无济于事。 - 尝试添加
administrator
标签 - 无济于事。
将代理的用户更改为'amqp://admin_user:admin_pass@<machine_ip>:5672/vhost'
确实有效:
from celery import Celery
app = Celery('remote', broker='amqp://admin_user:admin_pass@<machine_ip>:5672/vhost')
app.send_task('ack', args=('a1', 'a2'), queue='ack', route_name='ack')
但我不想给远程机器admin_user
权限。 知道我能做什么吗?
已解决, 我猜 API 发生了变化,但为了保持 RabbitMQ 的当前权限,我必须使用以下路由:
old_celery_config.py:(芹菜 3.1(
CELERY_ROUTES = {
'ack_task': {
'queue': 'geo_ack'
}
}
celery_config.py:(芹菜 4.1(
CELERY_ROUTES = {
'ack_task': {
'exchange': 'ack',
'exchange_type': 'direct',
'routing_key': 'ack'
}
}
run_task.py:
from celery import Celery
app = Celery('remote', broker='amqp://remote_user:remote_pass@<machine_ip>:5672/vhost')
app.config_from_object('celery_config')
app.send_task('ack_task', args=('a1', 'a2'))