如何检测我是否在芹菜工人中运行?



是否有一种方法可以确定,以编程方式,当前模块被导入/运行是在一个芹菜工人的上下文中这样做?

我们已经决定在运行Celery worker之前设置一个环境变量,并在代码中检查这个环境变量,但我想知道是否有更好的方法?

简单,

import sys
IN_CELERY_WORKER_PROCESS = sys.argv and sys.argv[0].endswith('celery')
    and 'worker' in sys.argv
if IN_CELERY_WORKER_PROCESS:
    print ('Im in Celery worker')
http://percentl.com/blog/django-how-can-i-detect-whether-im-running-celery-worker/

从芹菜4.2开始,您也可以通过在worker_ready信号

上设置标志来实现这一点

in celery.py:


from celery.signals import worker_ready
app = Celery(...)
app.running = False
@worker_ready.connect
def set_running(*args, **kwargs):
    app.running = True

现在你可以使用全局应用实例检查你的任务看看你是否在跑步。这对于确定使用哪个记录器非常有用。

根据您的用例场景,您可以通过检查请求id是否设置来检测它:

@app.task(bind=True)
def foo(self):
    print self.request.id

如果您调用上面的foo.delay(),那么任务将被发送到一个worker,并且self.request.id将被设置为一个唯一的数字。如果您调用它作为foo(),那么它将在当前进程中执行,self.request.id将是None

您可以使用Celery应用程序实例类中的current_worker_task属性。文档在这里。

定义了以下任务:

 # whatever_app/tasks.py
 celery_app = Celery(app)
 @celery_app.task
 def test_task():
     if celery_app.current_worker_task:
         return 'running in a celery worker'
     return 'just running'

您可以在python shell中运行以下命令:

In [1]: from whatever_app.tasks import test_task
In [2]: test_task()
Out[2]: 'just running'
In [3]: r = test_task.delay()
In [4]: r.result
Out[4]: u'running in a celery worker' 

注意:显然,要使test_task.delay()成功,您需要至少运行一个芹菜工作器,并配置为从whatever_app.tasks加载任务。

添加环境变量是检查模块是否由celery worker运行的好方法。在任务提交进程中,我们可以设置环境变量,以标记它不在芹菜worker的上下文中运行。

但是更好的方法可能是使用一些芹菜信号,这可能有助于知道模块是在worker还是task submit中运行。例如,worker-process-init信号被发送到每个子任务执行进程(在预分叉模式下),处理程序可以用来设置一些全局变量,表明它是一个工作进程。

使用名称启动worker是一个很好的做法,这样管理(停止/杀死/重新启动)它们变得更容易。你可以使用-n来命名一个worker。

celery worker -l info -A test -n foo

现在,在您的脚本中,您可以使用app.control.inspect来查看该worker是否正在运行。

In [22]: import test
In [23]: i = test.app.control.inspect(['foo'])
In [24]: i.app.control.ping()
Out[24]: [{'celery@foo': {'ok': 'pong'}}]

你可以在celery worker docs中阅读更多信息

相关内容

  • 没有找到相关文章

最新更新