芹菜的SQLAlchemy会话问题



我已经为我们的web应用程序安排了一些使用芹菜节拍的循环任务

应用程序本身是使用金字塔web框架构建的。使用zopetransaction扩展管理会话

在芹菜,我使用的应用程序作为一个库。我正在用函数重新定义模型中的会话。

它工作得很好,但偶尔会提高InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction

我不知道哪里出了问题,为什么会发出这些警告。

示例代码:

in tasks.py

def initialize_async_session():
    import sqlalchemy
    from webapp.models import Base, set_dbsession, engine
    Session = sqlalchemy.orm.scoped_session(
                sqlalchemy.orm.sessionmaker(autocommit=True, autoflush=True)
                            )
    Session.configure(bind=engine)
    session = Session()
    set_dbsession(session)
    Base.metadata.bind = engine
    return session

@celery.task
def rerun_scheduler():
    log.info("Starting pipeline scheduler")
    session = initialize_async_session()
    webapp.sheduledtask.service.check_for_updates(session)
    log.info("Ending pipeline scheduler")

webapp中models.py

DBSession = scoped_session(sessionmaker(bind=engine, expire_on_commit=False,
                    extension=ZopeTransactionExtension()))
def set_dbsession(db_session=None):
    """
    This function sets the db session
    """
    global DBSession
    if db_session:
        DBSession = db_session
        log.info("session changed to {0}".format(db_session))
更新:

回溯:

Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 551, in __bootstrap_inner
    self.run()
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 54, in new_function
    result = f(*args, **kwargs)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 100, in new_function
    result = f(*args, **kwargs)
  File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/data/mongo_service.py", line 1274, in run
    self.table_params.set_task_status_as_finished()
  File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/mem_objects.py", line 33, in set_task_status_as_finished
    task = Task.get_by_id(self.task_id)
  File "/home/ubuntu/modwsgi/env/mvc-service/webapp/webapp/models.py", line 162, in get_by_id
    return DBSession.query(cls).filter(cls.id == obj_id).first()
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2156, in first
    ret = list(self[0:1])
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2023, in __getitem__
    return list(res)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2227, in __iter__
    return self._execute_and_instances(context)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2240, in _execute_and_instances
    close_with_result=True)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2231, in _connection_from_session
    **kw)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 777, in connection
    close_with_result=close_with_result)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 781, in _connection_for_bind
    return self.transaction._connection_for_bind(engine)
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 289, in _connection_for_bind
    self._assert_is_active()
  File "/home/ubuntu/modwsgi/env/local/lib/python2.7/site-packages/SQLAlchemy-0.7.9-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 217, in _assert_is_active
    "This Session's transaction has been rolled back "
InvalidRequestError: This Session's transaction has been rolled back by a nested rollback() call.  To begin a new transaction, issue Session.rollback() first.
#########################################################################
[2013-05-30 14:32:57,782: WARNING/PoolWorker-3] Exception in thread Thread-4:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 54, in new_function
    result = f(*args, **kwargs)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/edgem_common-0.0-py2.7.egg/common/utils.py", line 100, in new_function
    result = f(*args, **kwargs)
  File "/home/ranjith/wksp/mvc-service/webapp/webapp/data/mongo_service.py", line 1274, in run
    self.table_params.set_task_status_as_finished()
  File "/home/ranjith/wksp/mvc-service/webapp/webapp/mem_objects.py", line 33, in set_task_status_as_finished
    task = Task.get_by_id(self.task_id)
  File "/home/ranjith/wksp/mvc-service/webapp/webapp/models.py", line 166, in get_by_id
    return DBSession.query(cls).filter(cls.id == obj_id).first()
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2145, in first
    ret = list(self[0:1])
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2012, in __getitem__
    return list(res)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2216, in __iter__
    return self._execute_and_instances(context)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2229, in _execute_and_instances
    close_with_result=True)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/query.py", line 2220, in _connection_from_session
    **kw)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 798, in connection
    close_with_result=close_with_result)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 802, in _connection_for_bind
    return self.transaction._connection_for_bind(engine)
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 281, in _connection_for_bind
    self._assert_active()
  File "/home/ranjith/wksp/env/local/lib/python2.7/site-packages/SQLAlchemy-0.8.1-py2.7-linux-x86_64.egg/sqlalchemy/orm/session.py", line 181, in _assert_active
    "This session is in 'prepared' state; no further "
InvalidRequestError: This session is in 'prepared' state; no further SQL can be emitted within this transaction.

我认为问题是您试图在芹菜任务中使用SQLAlchemy会话。

我建议做的第一件事是创建两个单独的作用域会话,一个用于您的芹菜应用程序,另一个用于您的web应用程序。接下来,我将确保在芹菜初始化期间只配置一次芹菜数据库会话。您可以使用芹菜worker_init.connect来确保它在芹菜启动期间创建数据库(http://hynek.me/articles/using-celery-with-pyramid/)。

这是非常重要的,你的web应用程序不使用相同的数据库会话作为你的芹菜应用程序。

类似于tasks.py文件:

from celery import Celery
from celery.signals import worker_init
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
Session = sqlalchemy.orm.scoped_session(
    sqlalchemy.orm.sessionmaker(autocommit=True, autoflush=True))

@worker_init.connect
def initialize_session():
    some_engine = create_engine('database_url')    
    Session.configure(bind=some_engine)
@celery.task
def rerun_scheduler():
    log.info("Starting pipeline scheduler")
    webapp.sheduledtask.service.check_for_updates(Session)
    log.info("Ending pipeline scheduler")

交叉张贴我对一个非常类似的堆栈溢出的回答:在芹菜中使用SQLAlchemy会话的正确方法是什么?

这为我解决了这个问题:

Sqlalchemy默认以非线程安全的方式进行连接池。芹菜默认分叉进程:需要修改其中一个。

关闭Sqlalchemy池

Sql Alchemy Docs

from sqlalchemy.pool import NullPool
engine = create_engine(
    SQLALCHEMY_DATABASE_URL, poolclass=NullPool
)

相关内容

  • 没有找到相关文章

最新更新