当SQLAlchemy事件触发Celery任务时,连接将关闭



当我的一个单元测试删除SQLAlchemy对象时,该对象会触发after_delete事件,该事件会触发Celery任务从驱动器中删除文件。

测试时任务为CELERY_ALWAYS_EAGER = True

要点以轻松再现问题

该示例有两个测试。其中一个触发事件中的任务,另一个在事件之外。只有事件中的一个会关闭连接。

要快速再现错误,您可以运行:

git clone https://gist.github.com/5762792fc1d628843697.git
cd 5762792fc1d628843697
virtualenv venv
. venv/bin/activate
pip install -r requirements.txt
python test.py

堆栈:

$     python test.py
E
======================================================================
ERROR: test_delete_task (__main__.CeleryTestCase)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "test.py", line 73, in test_delete_task
    db.session.commit()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in do
    return getattr(self.registry(), name)(*args, **kwargs)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 776, in commit
    self.transaction.commit()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 377, in commit
    self._prepare_impl()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 357, in _prepare_impl
    self.session.flush()
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flush
    self._flush(objects)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 63, in __exit__
    compat.reraise(type_, value, traceback)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush
    transaction.rollback(_capture_exception=True)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 393, in rollback
    self._assert_active(prepared_ok=True, rollback_ok=True)
  File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 223, in _assert_active
    raise sa_exc.ResourceClosedError(closed_msg)
ResourceClosedError: This transaction is closed
----------------------------------------------------------------------
Ran 1 test in 0.014s
FAILED (errors=1)

我想我发现了问题——问题出在如何设置Celery任务上。如果你从芹菜设置中删除了应用程序上下文调用,一切都会正常运行:

class ContextTask(TaskBase):
    abstract = True
    def __call__(self, *args, **kwargs):
        # deleted --> with app.app_context():
        return TaskBase.__call__(self, *args, **kwargs)

SQLAlchemy文档中有一个关于在after_delete事件期间永远不要修改会话的重大警告:http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete

因此,我怀疑with app.app_context():在删除过程中被调用,试图附加和/或修改Flask SQLAlchemy存储在app对象中的会话,因此整个事情正在爆炸。

Flask SQlAlchemy在幕后为你做了很多魔术,但你可以绕过它,直接使用SQlAlchemy。如果您需要在删除事件期间与数据库对话,您可以创建一个到数据库的新会话:

@celery.task()
def my_task():
    # obviously here I create a new object
    session = db.create_scoped_session()
    session.add(User(id=13, value="random string"))
    session.commit()
    return

但听起来你并不需要这个,你只是想删除一个图像路径。在这种情况下,我会改变你的任务,让它走一条路:

# instance will call the task
@event.listens_for(User, "after_delete")
def after_delete(mapper, connection, target):
    my_task.delay(target.value)
@celery.task()
def my_task(image_path):
    os.remove(image_path) 

希望这会有所帮助——如果这些对你不起作用,请告诉我。感谢您非常详细的设置,它确实有助于调试。

类似于deBrice提出的答案,但使用了类似于Rachel的方法。

class ContextTask(TaskBase):
    abstract = True
    def __call__(self, *args, **kwargs):
        import flask
        # tests will be run in unittest app context
        if flask.current_app:
            return TaskBase.__call__(self, *args, **kwargs)
        else:
            # actual workers need to enter worker app context 
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
芹菜的创建者Ask在github 上提出了这个解决方案
from celery import signals
def make_celery(app):
     ...
     @signals.task_prerun.connect
     def add_task_flask_context(sender, **kwargs):
         if not sender.request.is_eager:
            sender.request.flask_context = app.app_context().__enter__()
    @signals.task_postrun.connect
    def cleanup_task_flask_context(sender, **kwargs):
       flask_context = getattr(sender.request, 'flask_context', None)
       if flask_context is not None:
           flask_context.__exit__(None, None, None)

相关内容

  • 没有找到相关文章

最新更新