我有以下代码,通过访问网址来启动芹菜链。链参数通过查询参数传递,例如:/process_pipeline/?pipeline=task_a|task_c|task_b
。为了避免启动几个类似的链式任务(例如,如果有人刷新页面),我使用了一个简单的缓存锁定系统。
我在缓存上有一个超时,但我在这里缺少的是一种在链提交时释放缓存的方法。
知道吗?
tasks.py
from __future__ import absolute_import
from celery import shared_task
registry = {}
def register(fn):
registry[fn.__name__] = fn
@shared_task
def task_a(*args, **kwargs):
print('task a')
@shared_task
def task_b(*args, **kwargs):
print('task b')
@shared_task
def task_c(*args, **kwargs):
print('task c')
register(task_a)
register(task_b)
register(task_c)
views.py
from __future__ import absolute_import
from django.core.cache import cache as memcache
from django.shortcuts import redirect
from django.utils.hashcompat import md5_constructor as md5
from celery import chain
from .tasks import registry
LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
def process_pipeline(request):
pipeline = request.GET.get('pipeline')
hexdigest = md5(pipeline).hexdigest()
lock_id = 'lock-{0}'.format(hexdigest)
# cache.add fails if if the key already exists
acquire_lock = lambda: memcache.add(lock_id, None, LOCK_EXPIRE)
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
release_lock = lambda: memcache.delete(lock_id)
if acquire_lock():
args = [registry[p].s() for p in pipeline.split('|')]
task = chain(*args).apply_async()
memcache.set(lock_id, task.id)
return redirect('celery-task_status', task_id=task.id)
else:
task_id = memcache.get(lock_id)
return redirect('celery-task_status', task_id=task_id)
from django.conf.urls import patterns, url
urls.py
urlpatterns = patterns('aafilters.views',
url(r'^process_pipeline/$', 'process_pipeline', name="process_pipeline"),
)
我从来没有用过它,但我认为你应该看看芹菜画布。这似乎是你想要的。