Python-Celery如何为每个工作线程建立异步连接



我创建了几个对Celery任务的端点调用,这些任务针对DB预先形成不同的任务。显然,每次都重新连接到DB是没有意义的,但从另一方面来说——什么时候应该关闭连接?使用到DB的异步连接有意义吗?

我不确定我如何才能实现这一点,如果将Async与Celery一起使用有意义,我将感谢的任何指导

import os
import traceback
from celery import Celery
from celery.utils.log import get_task_logger
from config.config import *

app = Celery('proj',
broker=config('CELERY_BROKER_URL'),
backend=config('CELERY_RESULT_BACKEND'),
include=['proj.tasks','proj.fetch_data'])
app.conf.update(
result_expires=3600,
)

app.autodiscover_tasks()

if __name__ == '__main__':
app.start()

我想到了worker_process_init、worker_process_shutdown、

注:database_ps_ms_stg-是基于数据库的(异步到postgres(。

tasks.py

from .celery import app
from celery.signals import worker_process_init, worker_process_shutdown,task_postrun
from config.db import database_ps_ms_stg 
import asyncio

@worker_process_init.connect
async def init_worker(**kwargs):

if not database_ps_ms_stg.is_connected:
await database_ps_ms_stg.connect()
print ("connected to database_ps_ms_stg") 

@worker_process_shutdown.connect
async def shutdown_worker(**kwargs):

if  database_ps_ms_stg.is_connected:
await database_ps_ms_stg.disconnect()
print ("disconneceting from database_ps_ms_stg") 

获取:

[2021-07-18 16:23:16,951: WARNING/ForkPoolWorker-1] /usr/local/lib/python3.8/site-packages/celery/concurrency/prefork.py:77: RuntimeWarning: coroutine 'init_worker' was never awaited
signals.worker_process_init.send(sender=None)
```

您的协同程序没有被安排在任何事件循环中执行。例如,这个代码

@worker_process_init.connect
async def init_worker(**kwargs):

if not database_ps_ms_stg.is_connected:
await database_ps_ms_stg.connect()
print ("connected to database_ps_ms_stg") 

只是在worker_process_init启动时创建一个协程对象,但之后不处理此对象。

这可能是一个解决方案——用一个调度器装饰器包装您的协同程序,它将启动它们

def async2sync(func):
def wrapper(*args, **kwargs):
loop = asyncio.get_event_loop()
task = loop.ensure_future(func())
task.add_done_callback(lambda f: loop.stop())
loop.run_forever()
try:
return task.result()
except asyncio.CancelledError:
pass
return wrapper
...    
@worker_process_init.connect
@async2sync
async def init_worker(**kwargs):

if not database_ps_ms_stg.is_connected:
await database_ps_ms_stg.connect()
print ("connected to database_ps_ms_stg") 

请检查这是否可以回答您的一些问题。我的观点是,这不值得,最好只使用阻塞连接器,只要您的代码打算与芹菜一起使用。

最新更新