芹菜试图通过task_postrun信号中提高系统退出来关闭工人,但始终挂起并且主进程永远不会退出



我正在尝试通过task_postrun信号中的葡萄干SystemExit()关闭主要的芹菜进程。信号被很好地触发,异常被引发,但工人永远不会完全退出,只是挂在那里。

我该如何完成这项工作?

我是否忘记了某个地方的设置?

以下是我用于辅助角色 (worker.py) 的代码:

from celery import Celery
from celery import signals
app = Celery('tasks',
set_as_current = True,
broker='amqp://guest@localhost//',
backend="mongodb://localhost//",
)
app.config_from_object({
"CELERYD_MAX_TASKS_PER_CHILD": 1,
"CELERYD_POOL": "solo",
"CELERY_SEND_EVENTS": True,
"CELERYD_CONCURRENCY": 1,
"CELERYD_PREFETCH_MULTIPLIER": 1,
})
def shutdown_worker(**kwargs):
print("SHUTTING DOWN WORKER (RAISING SystemExit)")
raise SystemExit()
import tasks
signals.task_postrun.connect(shutdown_worker)
print("STARTING WORKER")
app.worker_main()
print("WORKER EXITED!")

以下是 tasks.py 的代码:

from celery import Celery,task
from celery.signals import task_postrun
import time
from celery.task import Task
class Blah(Task):
track_started = True
acks_late = False
def run(config, kwargs):
time.sleep(5)
return "SUCCESS FROM BLAH"
def get_app():
celery = Celery('tasks',
broker='amqp://guest@localhost//',
backend="mongodb://localhost//"
)
return celery

为了测试这一点,我要做的第一件事是运行worker代码(python worker.py),然后我像这样排队一个任务:

>>> import tasks
>>> results = []
>>> results.append(tasks.Blah.delay({}))

我从工人那里看到的输出是这样的:

STARTING WORKER
-------------- celery@hostname v3.0.9 (Chiastic Slide)
---- **** ----- 
--- * ***  * -- [Configuration]
-- * - **** --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         tasks:0x26f3050
- ** ---------- . concurrency: 1 (solo)
- ** ---------- . events:      ON
- ** ---------- 
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** ----- 
[2012-11-06 15:59:16,761: WARNING/MainProcess] celery@hostname has started.
[2012-11-06 15:59:21,785: WARNING/MainProcess] SHUTTING DOWN WORKER (RAISING SystemExit)

我期待 python 代码从调用app.worker_main()返回,然后打印WORKER EXITED然后进程完全退出。这永远不会发生,并且必须kill -KILL {PID}工作进程才能消失(它也不能使用任何其他任务。

我想我的主要问题是:

如何使代码从app.worker_main()返回?

我希望能够在执行X数量的任务后完全重新启动工作进程(通过让进程完全退出)。

更新我弄清楚了工人挂在什么上 - 工人(WorkController)在捕获SystemExit异常后挂在呼叫self.stop上。

讨厌我最终回答自己的问题。

Anywhoo,它阻止了WorkControllerMediator组件上的加入调用(Mediator组件上的stop()调用,在停止内部,它joins)。

我通过禁用所有速率限制来摆脱Mediator组件(默认情况下应该是这样,但不是出于某种原因)。

您可以使用以下设置禁用所有速率限制:

CELERY_DISABLE_ALL_RATE_LIMITS: True

希望这也有助于其他人。

和平

相关内容

  • 没有找到相关文章

最新更新