我对Celery很陌生,我一直在尝试设置一个有两个独立队列的项目(一个要计算,另一个要执行)。到目前为止,一切都很好。
我的问题是,执行队列中的工作者需要实例化一个具有唯一object_id(每个工作者一个id)的类。我想知道我是否可以编写一个自定义的工作程序初始化来在启动时初始化对象,并将其保存在内存中,直到工作程序被杀死。
我在custom_task上发现了一个类似的问题,但所提出的解决方案在我的情况下不起作用。
考虑以下玩具示例:
celery.py
from celery import Celery
app = Celery('proj',
broker='amqp://guest@localhost//',
backend='amqp://',
include=['proj.tasks'])
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=60,
CELERY_ROUTES = {"proj.tasks.add1": {"queue": "q1"}},
)
if __name__ == '__main__':
app.start()
tasks.py
from proj.celery import app
from celery.signals import worker_init
@worker_init.connect(sender='worker1@hostname')
def configure_worker1(*args, **kwargs):
#SETUP id=1 for add1 here???
@worker_init.connect(sender='worker2@hostname')
def configure_worker2(*args, **kwargs):
#SETUP id=2 for add1 here???
@app.task
def add1(y):
return id + y
@app.task
def add(x, y):
return x + y
初始化:
celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info
这是正确的方法吗?如果是,我应该在tasks.py
的configure_worker1
函数中写些什么来在worker初始化时设置id
?
感谢
我通过以下操作找到了答案http://docs.celeryproject.org/en/latest/userguide/tasks.html#instantiation
tasks.py看起来像这样:
from proj.celery import app
from celery import Task
class Task1(Task):
def __init__(self):
self._x = 1.0
class Task2(Task):
def __init__(self):
self._x = 2.0
@app.task(base=Task1)
def add1(y):
return add1._x + y
@app.task(base=Task2)
def add2(y):
return add2._x + y
像以前一样初始化:
celery multi start worker1 -A proj -l info -Q q1
celery multi start worker2 -A proj -l info -Q q1
celery multi start worker3 -A proj -l info