Django芹菜任务保持全局状态



我目前正在开发一个基于Django-tenants模式的Django应用程序。您不需要查看模块的实际代码,但其想法是,它为当前数据库连接提供了一个全局设置,定义了应用程序租户使用的模式,例如

tenant = tenants_schema.get_tenant()

以及设置

tenants_schema.set_tenant(xxx)

对于的一些任务,我希望他们记住实例化期间选择的当前全局租户,例如理论上:

class AbstractTask(Task):
    '''
    Run this method before returning the task future
    '''
    def before_submit(self):
         self.run_args['tenant'] = tenants_schema.get_tenant()
    '''
    This method is run before related .run() task method
    '''
    def before_run(self):
         tenants_schema.set_tenant(self.run_args['tenant'])

在芹菜中有没有一种优雅的做法?

Celery(从3.1起)有一些信号可以用来实现这一点。您可以更改传入的kwargs,另一方面,在将更改交给实际任务之前撤消更改:

from celery import shared_task
from celery.signals import before_task_publish, task_prerun, task_postrun
from threading import local
current_tenant = local()
@before_task_publish.connect
def add_tenant_to_task(body=None, **unused):
    body['kwargs']['tenant_middleware.tenant'] = getattr(current_tenant, 'id', None)
    print 'sending tenant: {t}'.format(t=current_tenant.id)

@task_prerun.connect
def extract_tenant_from_task(kwargs=None, **unused):
    tenant_id = kwargs.pop('tenant_middleware.tenant', None)
    current_tenant.id = tenant_id
    print 'current_tenant.id set to {t}'.format(t=tenant_id)

@task_postrun.connect
def cleanup_tenant(**kwargs):
    current_tenant.id = None
    print 'cleaned current_tenant.id'

@shared_task
def get_current_tenant():
    # Here is where you would do work that relied on current_tenant.id being set.
    import time
    time.sleep(1) 
    return current_tenant.id

如果您运行任务(不显示工作人员的日志记录):

In [1]: current_tenant.id = 1234; ct = get_current_tenant.delay(); current_tenant.id = 5678; ct.get()
sending tenant: 1234
Out[1]: 1234
In [2]: current_tenant.id
Out[2]: 5678

如果没有发送消息(当您直接调用任务函数时,没有delay()apply_async()),则不会调用信号。如果要对任务名称进行筛选,它在before_task_publish信号处理程序中可用作body['task'],而task对象本身在task_preruntask_postrun处理程序中也可用。

我是一个Celery新手,所以我真的不知道这是否是在Celery中做"中间件"类型的事情的"幸运"方式,但我认为它对我有用。

我不确定你在这里的意思,before_submit是在客户端调用任务之前执行的吗?

在这种情况下,我宁愿在这里使用with语句:

from contextlib import contextmanager
@contextmanager
def set_tenant_db(tenant):
    prev_tenant = tenants_schema.get_tenant()
    try:
        tenants_scheme.set_tenant(tenant)
        yield
    finally:
        tenants_schema.set_tenant(prev_tenant)

@app.task
def tenant_task(tenant=None):
    with set_tenant_db(tenant):
        do_actions_here()

tenant_task.delay(tenant=tenants_scheme.get_tenant())

当然,您可以创建一个自动执行此操作的基本任务,例如,您可以在Task.__call__中应用上下文,但我不确定如果可以显式地使用with语句,则可以节省很多费用。

相关内容

  • 没有找到相关文章

最新更新