我想知道,使用相同的代理让不同的 Celery 实例对象是一种好的做法吗?
目前,我有一个 rabbitmq,充当 3 个芹菜实例之间共享的单一经纪人。我的芹菜实例如下
insider_transaction
- 固定计划工作人员。每分钟运行一次earning
- 由 Web 服务器创建的工作线程。stock_price
- 由 Web 服务器创建的工作线程。
我设计了每个工人在自己的 docker 容器中运行。我预计 3 名工人将彼此独立运行。
但是,我意识到事实并非如此!
例如,earning
工作人员将错误地接收假定只有stock_price
或insider_transaction
接收的消息。
您将看到earning
工作人员收到的此类消息。
earning_1 | The message has been ignored and discarded.
earning_1 |
earning_1 | Did you remember to import the module containing this task?
earning_1 | Or maybe you're using relative imports?
earning_1 |
earning_1 | Please see
earning_1 | http://docs.celeryq.org/en/latest/internals/protocol.html
earning_1 | for more information.
earning_1 |
earning_1 | The full contents of the message body was:
earning_1 | '[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
earning_1 | Traceback (most recent call last):
earning_1 | File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 561, in on_task_received
earning_1 | strategy = strategies[type_]
earning_1 | KeyError: 'insider_transaction.run'
而这个
earning_1 | The message has been ignored and discarded.
earning_1 |
earning_1 | Did you remember to import the module containing this task?
earning_1 | Or maybe you're using relative imports?
earning_1 |
earning_1 | Please see
earning_1 | http://docs.celeryq.org/en/latest/internals/protocol.html
earning_1 | for more information.
earning_1 |
earning_1 | The full contents of the message body was:
earning_1 | '[[2, 3], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (81b)
earning_1 | Traceback (most recent call last):
earning_1 | File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 561, in on_task_received
earning_1 | strategy = strategies[type_]
earning_1 | KeyError: 'stock_price.mul'
我不希望这样的事情发生。在我的网络服务器端代码(烧瓶)中。我写了
celery0 = Celery('earning',
broker=CELERY_BROKER_URL,
backend=CELERY_RESULT_BACKEND)
celery1 = Celery('stock_price',
broker=CELERY_BROKER_URL,
backend=CELERY_RESULT_BACKEND)
@app.route('/do_work/<int:param1>/<int:param2>')
def do_work(param1,param2):
task0 = celery0.send_task('earning.add', args=[param1, param2], kwargs={})
task1 = celery1.send_task('stock_price.mul', args=[param1, param2], kwargs={})
因此,我希望earning
工作人员只会收到earning
消息,而不会收到stock_price
消息,也不会收到insider_transaction
消息。
我可以知道,为什么会出现这个问题?芹菜共享单个代理的不同实例是不可能的吗?
演示此问题的项目可以从 https://github.com/yccheok/celery-hello-world 签出
docker-compose build
docker-compose up -d
http://localhost:5000/do_work/2/3
docker-compose up earning
您是否正在使用路由密钥?您可以使用路由密钥告诉交易所要处理哪些队列的任务。在芹菜配置中设置这些可能有助于防止错误的工作人员使用错误的消息。