我有一个繁重的外部库类,初始化需要时间并消耗大量内存。我想每个任务实例至少创建一次。
class NlpTask(Task):
def __init__(self):
print('initializing NLP parser')
self._parser = nlplib.Parser()
print('done initializing NLP parser')
@property
def parser(self):
return self._parser
@celery.task(base=NlpTask)
def my_task(arg):
x = my_task.parser.process(arg)
# etc.
Celery 启动 32 个工作进程,因此我希望打印"initializing ... done"
32 次,因为我假设为每个工作线程创建一个任务实例。令人惊讶的是,我只打印了一次。那里到底发生了什么?谢谢。
您的NlpTask
在向工作人员注册时初始化一次。
如果您有两个任务,例如
@celery.task(base=NlpTask)
def foo(arg):
pass
@celery.task(base=NlpTask)
def bar(arg):
pass
然后,当您启动工作线程时,您将看到 2 个初始化。
如果要为每个工作人员初始化一次,可以使用worker_process_init信号。
from celery.signals import worker_process_init
@worker_process_init.connect()
def setup(**kwargs):
print('initializing NLP parser')
# setup
print('done initializing NLP parser')
现在,当您启动工作线程时,您将看到每个进程调用一次安装程序。
为此:
这就是我的观点 - 我希望每个工人一次,而且似乎每个芹菜实例一次。我编辑了这个问题——@davka
答案必须是在connect
中使用发件人过滤器,例如:
@worker_process_init.connect(sender='xx')
def func(sender, **kwargs):
if sender == 'xx':
# dosomething
但我发现它在芹菜 4.0.2 中不起作用。