pyramid ZopeTransactionExtension with celener:如何在web请求后立即提交事



这是我的情况。

我正在构建一个RESTful Web服务,该服务从客户端接收数据,然后根据这些数据创建一个Event,然后我想将这个新Event推送到芹菜中以异步处理它。

我用金字塔来构建RESTful Web服务,用pyramid_celery来制作金字塔&芹菜一起吃。

这里是我的观点的源代码:

# views.py
# This code recive data from client, then create a new record Event from this
posted_data = schema.deserialize(request.POST.mixed())
e = Event()
e.__dict__.update(posted_data)
DBSession.add(e)
transaction.commit()
print "Commited #%d" % e.id # code mark 01
fire_event.delay(e.id) # fire_event is a celery task
logging.getLogger(__name__).info('Add event #%d to tasks' % e.id)

这是我任务的源代码:

# tasks.py
@celery.task()
def fire_event(event_id):
    e = DBSession.query(Event).get(event_id)
    if e is None:
        return
    print "Firing event %d#%s" % (event_id, e)
    logger.info("Firing event %d#%s", event_id, e)

如果我使用金字塔的炼金术脚手架中的默认代码,则会在代码标记01行引发异常。像这样的例外:

DetachedInstanceError: Instance <Event at ...> is not bound to a Session; ...

从ZopeAlchemy文档中,为了避免这个异常,我这样配置DBSession:

# models.py
DBSession = scoped_session(sessionmaker(
                extension=ZopeTransactionExtension(keep_session=True)
            ))

现在我的问题是在我的RESTful请求完成后,与MySQL服务器的金字塔保持事务。RESTful请求完成后,我转到MySQL Server并运行命令:

SHOW engine innodb status;

从它的结果来看,我看到了:

--TRANSACTION 180692, ACTIVE 84 sec
MySQL thread id 94, OS thread handle 0x14dc, query id 1219 [domain] [ip] [project name] cleaning up
Trx read view will not see trx with id >= 180693, sees < 180693

这意味着Pyramid仍然保持连接,这没关系,但Pyramid也开始交易,这是个问题。当我尝试使用另一个工具访问我的MySQL Server时,这个事务可能会使我处于锁定状态。

我的问题是:

如何让Pyramid在RESTful请求完成后立即关闭事务。如果我做不到,我的处境还有别的解决办法吗

非常感谢。

Celery保持着一种"透明"地将代码作为任务运行的错觉——用@task装饰函数,然后使用my_function.delay(),一切都会神奇地工作。

事实上,意识到这一点有点棘手,你的代码在一个完全不同的过程中运行,可能在另一台机器上运行,可能几分钟/几小时后,并且该过程中不存在Pyramid请求/响应周期,因此ZopeTransactionExtension不能在请求完成时用于在工作过程中自动提交事务,因为没有请求,只有一个长期运行的工作进程。

因此,这不是让未完成的交易悬而未决的金字塔,而是你的工人流程。当您调用e = DBSession.query(Event).get(event_id)时,事务由SQLAlchemy启动,并且永远不会完成。

在这里,我为一个类似的问题写了一个更长的答案,并提供了更多细节:https://stackoverflow.com/a/16346587/320021-重点是为您的工作进程使用不同的会话

另一件事是,最好避免在Pyramid代码中使用transaction.commit(),因为对象会过期和其他丑陋的东西。在金字塔中,可以在请求完成后调用一个函数——我写了一个函数,它注册了一个回调,从那里调用一个芹菜任务:

from repoze.tm import after_end
import transaction
def invoke_task_after_commit(task_fn, task_args, task_kwargs):
    """
    This should ONLY be used within the web-application process managed by repoze.tm2
    otherwise a memory leak will result. See http://docs.repoze.org/tm2/#cleanup
    for more details.
    """
    t = transaction.get()  # the current transaction
    def invoke():
        task_fn.apply_async(
            args=task_args,
            kwargs=task_kwargs,
        )
    after_end.register(invoke, t)

(我从函数中删除了很多不相关的代码,因此可能会有拼写错误等。视为伪代码)

相关内容

  • 没有找到相关文章

最新更新