如何检查一个函数是否由芹菜执行?
def notification():
# in_celery() returns True if called from celery_test(),
# False if called from not_celery_test()
if in_celery():
# Send mail directly without creation of additional celery subtask
...
else:
# Send mail with creation of celery task
...
@celery.task()
def celery_test():
notification()
def not_celery_test():
notification()
这里有一种使用celery.current_task
的方法。以下是任务要使用的代码:
def notification():
from celery import current_task
if not current_task:
print "directly called"
elif current_task.request.id is None:
print "called synchronously"
else:
print "dispatched"
@app.task
def notify():
notification()
这是你可以运行的代码来练习上面的内容:
from core.tasks import notify, notification
print "DIRECT"
notification()
print "NOT DISPATCHED"
notify()
print "DISPATCHED"
notify.delay().get()
我在第一个代码段中的任务代码位于一个名为core.tasks
的模块中。我在自定义Django管理命令的最后一段代码中插入了代码。这测试了3种情况:
直接调用
notification
。通过同步执行的任务调用
notification
。也就是说,此任务不会通过Celery发送给工人。任务的代码在调用notify
的同一进程中执行。通过工作人员运行的任务调用
notification
。任务的代码在与启动它的进程不同的进程中执行。
输出为:
NOT DISPATCHED
called synchronously
DISPATCHED
DIRECT
directly called
在DISPATCHED
之后的输出中,任务中没有来自print
的行,因为该行最终出现在工作日志中:
[2015-12-17 07:23:57,527: WARNING/Worker-4] dispatched
重要提示:我最初在第一次测试中使用if current_task is None
,但它不起作用。我检查了又检查。不知怎的,Celery将current_task
设置为一个看起来像None
的对象(如果在上面使用repr
,则得到None
),但不是None
。不知道那里发生了什么。使用CCD_ 15工作。
此外,我已经在Django应用程序中测试了上面的代码,但我还没有在生产中使用过它。可能有我不知道的问题。