是否有一种方法可以确定,以编程方式,当前模块被导入/运行是在一个芹菜工人的上下文中这样做?
我们已经决定在运行Celery worker之前设置一个环境变量,并在代码中检查这个环境变量,但我想知道是否有更好的方法?
简单,
import sys
IN_CELERY_WORKER_PROCESS = sys.argv and sys.argv[0].endswith('celery')
and 'worker' in sys.argv
if IN_CELERY_WORKER_PROCESS:
print ('Im in Celery worker')
http://percentl.com/blog/django-how-can-i-detect-whether-im-running-celery-worker/从芹菜4.2开始,您也可以通过在worker_ready
信号
in celery.py
:
from celery.signals import worker_ready
app = Celery(...)
app.running = False
@worker_ready.connect
def set_running(*args, **kwargs):
app.running = True
现在你可以使用全局应用实例检查你的任务看看你是否在跑步。这对于确定使用哪个记录器非常有用。
根据您的用例场景,您可以通过检查请求id是否设置来检测它:
@app.task(bind=True)
def foo(self):
print self.request.id
如果您调用上面的foo.delay()
,那么任务将被发送到一个worker,并且self.request.id
将被设置为一个唯一的数字。如果您调用它作为foo()
,那么它将在当前进程中执行,self.request.id
将是None
。
您可以使用Celery
应用程序实例类中的current_worker_task
属性。文档在这里。
定义了以下任务:
# whatever_app/tasks.py
celery_app = Celery(app)
@celery_app.task
def test_task():
if celery_app.current_worker_task:
return 'running in a celery worker'
return 'just running'
您可以在python shell中运行以下命令:
In [1]: from whatever_app.tasks import test_task
In [2]: test_task()
Out[2]: 'just running'
In [3]: r = test_task.delay()
In [4]: r.result
Out[4]: u'running in a celery worker'
注意:显然,要使test_task.delay()
成功,您需要至少运行一个芹菜工作器,并配置为从whatever_app.tasks
加载任务。
添加环境变量是检查模块是否由celery worker运行的好方法。在任务提交进程中,我们可以设置环境变量,以标记它不在芹菜worker的上下文中运行。
但是更好的方法可能是使用一些芹菜信号,这可能有助于知道模块是在worker还是task submit中运行。例如,worker-process-init信号被发送到每个子任务执行进程(在预分叉模式下),处理程序可以用来设置一些全局变量,表明它是一个工作进程。
使用名称启动worker是一个很好的做法,这样管理(停止/杀死/重新启动)它们变得更容易。你可以使用-n
来命名一个worker。
celery worker -l info -A test -n foo
现在,在您的脚本中,您可以使用app.control.inspect
来查看该worker是否正在运行。
In [22]: import test
In [23]: i = test.app.control.inspect(['foo'])
In [24]: i.app.control.ping()
Out[24]: [{'celery@foo': {'ok': 'pong'}}]
你可以在celery worker docs中阅读更多信息