我目前正在开发一个基于Django-tenants模式的Django应用程序。您不需要查看模块的实际代码,但其想法是,它为当前数据库连接提供了一个全局设置,定义了应用程序租户使用的模式,例如
tenant = tenants_schema.get_tenant()
以及设置
tenants_schema.set_tenant(xxx)
对于的一些任务,我希望他们记住实例化期间选择的当前全局租户,例如理论上:
class AbstractTask(Task):
'''
Run this method before returning the task future
'''
def before_submit(self):
self.run_args['tenant'] = tenants_schema.get_tenant()
'''
This method is run before related .run() task method
'''
def before_run(self):
tenants_schema.set_tenant(self.run_args['tenant'])
在芹菜中有没有一种优雅的做法?
Celery(从3.1起)有一些信号可以用来实现这一点。您可以更改传入的kwargs
,另一方面,在将更改交给实际任务之前撤消更改:
from celery import shared_task
from celery.signals import before_task_publish, task_prerun, task_postrun
from threading import local
current_tenant = local()
@before_task_publish.connect
def add_tenant_to_task(body=None, **unused):
body['kwargs']['tenant_middleware.tenant'] = getattr(current_tenant, 'id', None)
print 'sending tenant: {t}'.format(t=current_tenant.id)
@task_prerun.connect
def extract_tenant_from_task(kwargs=None, **unused):
tenant_id = kwargs.pop('tenant_middleware.tenant', None)
current_tenant.id = tenant_id
print 'current_tenant.id set to {t}'.format(t=tenant_id)
@task_postrun.connect
def cleanup_tenant(**kwargs):
current_tenant.id = None
print 'cleaned current_tenant.id'
@shared_task
def get_current_tenant():
# Here is where you would do work that relied on current_tenant.id being set.
import time
time.sleep(1)
return current_tenant.id
如果您运行任务(不显示工作人员的日志记录):
In [1]: current_tenant.id = 1234; ct = get_current_tenant.delay(); current_tenant.id = 5678; ct.get()
sending tenant: 1234
Out[1]: 1234
In [2]: current_tenant.id
Out[2]: 5678
如果没有发送消息(当您直接调用任务函数时,没有delay()
或apply_async()
),则不会调用信号。如果要对任务名称进行筛选,它在before_task_publish
信号处理程序中可用作body['task']
,而task
对象本身在task_prerun
和task_postrun
处理程序中也可用。
我是一个Celery新手,所以我真的不知道这是否是在Celery中做"中间件"类型的事情的"幸运"方式,但我认为它对我有用。
我不确定你在这里的意思,before_submit是在客户端调用任务之前执行的吗?
在这种情况下,我宁愿在这里使用with语句:
from contextlib import contextmanager
@contextmanager
def set_tenant_db(tenant):
prev_tenant = tenants_schema.get_tenant()
try:
tenants_scheme.set_tenant(tenant)
yield
finally:
tenants_schema.set_tenant(prev_tenant)
@app.task
def tenant_task(tenant=None):
with set_tenant_db(tenant):
do_actions_here()
tenant_task.delay(tenant=tenants_scheme.get_tenant())
当然,您可以创建一个自动执行此操作的基本任务,例如,您可以在Task.__call__
中应用上下文,但我不确定如果可以显式地使用with语句,则可以节省很多费用。