检查是否在芹菜任务中



如何检查一个函数是否由芹菜执行?

def notification():
   # in_celery() returns True if called from celery_test(), 
   #                     False if called from not_celery_test()
   if in_celery():
      # Send mail directly without creation of additional celery subtask
      ...
   else:
      # Send mail with creation of celery task
      ...
@celery.task()
def celery_test():
    notification()
def not_celery_test():
    notification()

这里有一种使用celery.current_task的方法。以下是任务要使用的代码:

def notification():
    from celery import current_task
    if not current_task:
        print "directly called"
    elif current_task.request.id is None:
        print "called synchronously"
    else:
        print "dispatched"
@app.task
def notify():
    notification()

这是你可以运行的代码来练习上面的内容:

        from core.tasks import notify, notification
        print "DIRECT"
        notification()
        print "NOT DISPATCHED"
        notify()
        print "DISPATCHED"
        notify.delay().get()

我在第一个代码段中的任务代码位于一个名为core.tasks的模块中。我在自定义Django管理命令的最后一段代码中插入了代码。这测试了3种情况:

  • 直接调用notification

  • 通过同步执行的任务调用notification。也就是说,此任务不会通过Celery发送给工人。任务的代码在调用notify的同一进程中执行。

  • 通过工作人员运行的任务调用notification。任务的代码在与启动它的进程不同的进程中执行。

输出为:

NOT DISPATCHED
called synchronously
DISPATCHED
DIRECT
directly called

DISPATCHED之后的输出中,任务中没有来自print的行,因为该行最终出现在工作日志中:

[2015-12-17 07:23:57,527: WARNING/Worker-4] dispatched

重要提示:我最初在第一次测试中使用if current_task is None,但它不起作用。我检查了又检查。不知怎的,Celery将current_task设置为一个看起来像None的对象(如果在上面使用repr,则得到None),但不是None。不知道那里发生了什么。使用CCD_ 15工作。

此外,我已经在Django应用程序中测试了上面的代码,但我还没有在生产中使用过它。可能有我不知道的问题。

相关内容

  • 没有找到相关文章

最新更新