MongoDB变成一个芹菜任务 - 烧瓶应用程序



我正在尝试在我的Flask应用程序上使用Celery。 我正在文件insight_tasks.py中定义任务。 在该文件中定义了一个函数:

@celery_app.task
def save_insights_task()

该函数做了一些事情,然后出现了错误,我正在尝试将数据保存到MongoDB中,控制台抛出我:MongoEngineConnectionError('您尚未定义默认连接',)所以我认为这是因为MongoEngine还没有初始化,这是我的问题:

我应该如何在Celery Task中使用MongoDB?,因为当我在我的路由上使用MongoDB时(烧瓶应用程序),它按预期工作。

芹菜不共享数据库实例?

文件:

__init__.py(芹菜初始化)

celery_app = Celery('insights',
broker=config.CELERY_LOCATIONS_BROKER_URL,
backend=config.CELERY_LOCATIONS_RESULT_BACKEND,
include=['app.insight_tasks']
)

insight_tasks.py

from app.google import google_service
from app.models import LocationStats
from . import celery_app
from firebase_admin import db as firebase_db
import arrow

@celery_app.task
def save_insight_task(account_location, uid, gid, locations_obj, aggregation):
try:
insights, reviews = google_service.store_location_resources(
gid, uid,
start_datetime, end_datetime,
account_location, aggregation
)
except StandardError as err:
from pprint import pprint
import traceback
pprint(err)
pprint(traceback.print_exc())
path = 'saved_locations/{}/accounts/{}'.format(gid, account_location)
location = [loc for loc in locations_obj if loc['name'] == 'accounts/' + account_location]
if len(location) > 0:
firebase_db.reference(path).update(location[0])

这里google_service.store_location_resources()是将数据保存到MongoDB中的函数。 此函数由我的应用程序的路由在另一端使用,因此它按预期工作,除了在 Celery 任务

---------

芹菜任务被调用到POST请求中

帐户/路由.py

@account.route('/save/queue', methods=['POST'])
def save_all_locations():
data = request.data
dataDict = json.loads(data)
uid = request.headers.get('uid', None)
gid = request.headers.get('gid', None)
account_locations = dataDict['locations']
locations_obj = dataDict['locations_obj']
for path in account_locations:
save_insight_task.delay(account_location=path, uid=uid, gid=gid, locations_obj=locations_obj, aggregate='SOME_TEXT')

您应该连接到任务中的数据库。原因是子进程(由 Celery 创建)必须有自己的 mongo 客户端实例。

更多细节在这里 : 使用 PyMongo 与多处理

例如定义一个utils.py

from pymodm import connect
def mongo_connect():
return connect("mongodb://{0}:{1}/{2}".format(MONGODB['host'],
MONGODB['port'],
MONGODB['db_name']),
alias=MONGODB['db_name'])

然后在insight_tasks.py

from utils import mongo_connect
@celery_app.task
def save_insight_task(account_location, uid, gid, locations_obj, aggregation):
# connect to mongodb
mongo_connect()
# do your db operations
try:
insights, reviews = google_service.store_location_resources(
gid, uid,
start_datetime, end_datetime,
account_location, aggregation
)
except StandardError as err:
from pprint import pprint
import traceback
pprint(err)
pprint(traceback.print_exc())
path = 'saved_locations/{}/accounts/{}'.format(gid, account_location)
location = [loc for loc in locations_obj if loc['name'] == 'accounts/' + account_location]
if len(location) > 0:
firebase_db.reference(path).update(location[0])

请注意,我使用pymodm包而不是mongoengine包作为 mongo 的 ODM。

最新更新