当我的一个单元测试删除SQLAlchemy对象时,该对象会触发after_delete事件,该事件会触发Celery任务从驱动器中删除文件。
测试时任务为CELERY_ALWAYS_EAGER = True
。
要点以轻松再现问题
该示例有两个测试。其中一个触发事件中的任务,另一个在事件之外。只有事件中的一个会关闭连接。
要快速再现错误,您可以运行:
git clone https://gist.github.com/5762792fc1d628843697.git
cd 5762792fc1d628843697
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python test.py
堆栈:
$ python test.py
E
======================================================================
ERROR: test_delete_task (__main__.CeleryTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
File "test.py", line 73, in test_delete_task
db.session.commit()
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in do
return getattr(self.registry(), name)(*args, **kwargs)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 776, in commit
self.transaction.commit()
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 377, in commit
self._prepare_impl()
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 357, in _prepare_impl
self.session.flush()
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flush
self._flush(objects)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush
transaction.rollback(_capture_exception=True)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 63, in __exit__
compat.reraise(type_, value, traceback)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush
transaction.rollback(_capture_exception=True)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 393, in rollback
self._assert_active(prepared_ok=True, rollback_ok=True)
File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 223, in _assert_active
raise sa_exc.ResourceClosedError(closed_msg)
ResourceClosedError: This transaction is closed
----------------------------------------------------------------------
Ran 1 test in 0.014s
FAILED (errors=1)
我想我发现了问题——问题出在如何设置Celery任务上。如果你从芹菜设置中删除了应用程序上下文调用,一切都会正常运行:
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
# deleted --> with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
SQLAlchemy文档中有一个关于在after_delete事件期间永远不要修改会话的重大警告:http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete
因此,我怀疑with app.app_context():
在删除过程中被调用,试图附加和/或修改Flask SQLAlchemy存储在app
对象中的会话,因此整个事情正在爆炸。
Flask SQlAlchemy在幕后为你做了很多魔术,但你可以绕过它,直接使用SQlAlchemy。如果您需要在删除事件期间与数据库对话,您可以创建一个到数据库的新会话:
@celery.task()
def my_task():
# obviously here I create a new object
session = db.create_scoped_session()
session.add(User(id=13, value="random string"))
session.commit()
return
但听起来你并不需要这个,你只是想删除一个图像路径。在这种情况下,我会改变你的任务,让它走一条路:
# instance will call the task
@event.listens_for(User, "after_delete")
def after_delete(mapper, connection, target):
my_task.delay(target.value)
@celery.task()
def my_task(image_path):
os.remove(image_path)
希望这会有所帮助——如果这些对你不起作用,请告诉我。感谢您非常详细的设置,它确实有助于调试。
类似于deBrice提出的答案,但使用了类似于Rachel的方法。
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
import flask
# tests will be run in unittest app context
if flask.current_app:
return TaskBase.__call__(self, *args, **kwargs)
else:
# actual workers need to enter worker app context
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
from celery import signals
def make_celery(app):
...
@signals.task_prerun.connect
def add_task_flask_context(sender, **kwargs):
if not sender.request.is_eager:
sender.request.flask_context = app.app_context().__enter__()
@signals.task_postrun.connect
def cleanup_task_flask_context(sender, **kwargs):
flask_context = getattr(sender.request, 'flask_context', None)
if flask_context is not None:
flask_context.__exit__(None, None, None)