文档表示,自定义任务只实例化一次,这是缓存所有任务调用所需数据(如数据库连接(的好方法。但它似乎同时发生在工作者和呼叫者身上。任务MCVE:
# tasks.py
from celery import Celery, Task
from time import sleep
celery = Celery(
broker="redis://127.0.0.1:6379/0",
backend="redis://127.0.0.1:6379/0"
)
class PatternTask(Task):
def __init__(self):
print("Initialising task")
sleep(10)
self._pattern = "Hello, %s!"
print("Initialised task")
@property
def pattern(self):
return self._pattern
@celery.task(base=PatternTask)
def hello(who):
sleep(2)
return hello.pattern % who
和调用代码:
# main.py
from tasks import hello
print(hello.delay("world").get())
print(hello.delay("you").get())
这将使工作人员和调用代码延迟10秒:
$ python main.py
Initialising task
# <10 seconds>
Initialised task
# <2 seconds>
Hello, world!
# <2 seconds>
Hello, you!
我理解这对于支持hello("now")
的情况是必要的,其中不涉及工人。然而,有没有办法保证我永远不会这样做,并避免在调用代码中分配_pattern
时昂贵的sleep
操作和巨大的时间、内存和CPU资源浪费†?如果没有,那么对于这种情况,建议的解决方案是什么?
†(实际用例是加载工人操作所需的千兆字节数据,调用代码将无法使用这些数据。
解决方案是:芹菜信号。
# tasks.py
from celery import Celery, Task, signals
from time import sleep
celery = Celery(
broker="redis://127.0.0.1:6379/0",
backend="redis://127.0.0.1:6379/0"
)
class PatternTask(Task):
def __init__(self):
super().__init__()
signals.worker_init.connect(self.on_worker_init)
def on_worker_init(self, *args, **kwargs):
print("Initialising task")
sleep(10)
self._pattern = "Hello, %s!"
print("Initialised task")
@property
def pattern(self):
return self._pattern
@celery.task(bind=True, base=PatternTask)
def hello(self, who):
print(f"In hello {who}")
sleep(2)
print(f"Done hello {who}")
return self.pattern % who
在celery.signals.worker_init
处理程序全部完成之前,工作人员显然不会开始接受作业。(我还使用了bind=True
使其更易于维护;这与解决方案无关。(