当芹菜链完成时执行一些代码



我有以下代码,通过访问网址来启动芹菜链。链参数通过查询参数传递,例如:/process_pipeline/?pipeline=task_a|task_c|task_b 。为了避免启动几个类似的链式任务(例如,如果有人刷新页面),我使用了一个简单的缓存锁定系统。

我在缓存上有一个超时,但我在这里缺少的是一种在链提交时释放缓存的方法。

知道吗?

tasks.py

from __future__ import absolute_import
from celery import shared_task

registry = {}
def register(fn):
    registry[fn.__name__] = fn
@shared_task
def task_a(*args, **kwargs):
    print('task a')
@shared_task
def task_b(*args, **kwargs):
    print('task b')
@shared_task
def task_c(*args, **kwargs):
    print('task c')
register(task_a)
register(task_b)
register(task_c)

views.py

from __future__ import absolute_import
from django.core.cache import cache as memcache
from django.shortcuts import redirect
from django.utils.hashcompat import md5_constructor as md5
from celery import chain
from .tasks import registry

LOCK_EXPIRE = 60 * 5  # Lock expires in 5 minutes

def process_pipeline(request):
    pipeline = request.GET.get('pipeline')
    hexdigest = md5(pipeline).hexdigest()
    lock_id = 'lock-{0}'.format(hexdigest)
    # cache.add fails if if the key already exists
    acquire_lock = lambda: memcache.add(lock_id, None, LOCK_EXPIRE)
    # memcache delete is very slow, but we have to use it to take
    # advantage of using add() for atomic locking
    release_lock = lambda: memcache.delete(lock_id)
    if acquire_lock():
        args = [registry[p].s() for p in pipeline.split('|')]
        task = chain(*args).apply_async()
        memcache.set(lock_id, task.id)
        return redirect('celery-task_status', task_id=task.id)
    else:
        task_id = memcache.get(lock_id)
        return redirect('celery-task_status', task_id=task_id)

from django.conf.urls import patterns, url

urls.py

urlpatterns = patterns('aafilters.views',
    url(r'^process_pipeline/$', 'process_pipeline', name="process_pipeline"),
)

我从来没有用过它,但我认为你应该看看芹菜画布。这似乎是你想要的。

相关内容

  • 没有找到相关文章

最新更新