是每个工作进程初始化的芹菜任务,还是每个应用初始化一次



我有一个繁重的外部库类,初始化需要时间并消耗大量内存。我想每个任务实例至少创建一次。

class NlpTask(Task):
    def __init__(self):
        print('initializing NLP parser')
        self._parser = nlplib.Parser()
        print('done initializing NLP parser')
    @property
    def parser(self):
        return self._parser
@celery.task(base=NlpTask)
def my_task(arg):
    x = my_task.parser.process(arg)
    # etc.
Celery 启动 32 个工作

进程,因此我希望打印"initializing ... done" 32 次,因为我假设为每个工作线程创建一个任务实例。令人惊讶的是,我只打印了一次。那里到底发生了什么?谢谢。

您的NlpTask在向工作人员注册时初始化一次。

如果您有两个任务,例如

@celery.task(base=NlpTask)
def foo(arg):
    pass

@celery.task(base=NlpTask)
def bar(arg):
    pass

然后,当您启动工作线程时,您将看到 2 个初始化。

如果要为每个工作人员初始化一次,可以使用worker_process_init信号。

from celery.signals import worker_process_init

@worker_process_init.connect()
def setup(**kwargs):
    print('initializing NLP parser')
    # setup
    print('done initializing NLP parser')

现在,当您启动工作线程时,您将看到每个进程调用一次安装程序。

为此:

这就是我的观点 - 我希望每个工人一次,而且似乎每个芹菜实例一次。我编辑了这个问题——@davka

答案必须是在connect中使用发件人过滤器,例如:

@worker_process_init.connect(sender='xx')
def func(sender, **kwargs):
    if sender == 'xx':
        # dosomething

但我发现它在芹菜 4.0.2 中不起作用。

相关内容

  • 没有找到相关文章

最新更新