去抖芹任务

  • 本文关键字:任务 python celery
  • 更新时间 :
  • 英文 :


有没有标准的方法去抖芹菜任务?

例如,这样任务就可以多次"启动",但只会在一段时间延迟后运行一次:

def debounce_task(task):
    if task_is_queued(task):
        return
    task.apply_async(countdown=30)

以下是我们使用 Redis 计数器的方法。所有这些都可以在装饰器中推广,但我们只将其用于特定任务(webhooks)

面向公众的任务是您从其他函数调用的任务。它需要在 Redis 中增加一个键。键由函数的参数组成,无论它们是什么(这确保了计数器在各个任务中是唯一的)

@task
def your_public_task(*args, **kwargs):
    cache_key = make_public_task_cache_key(*args, **kwargs)
    get_redis().incr(cache_key)
    _your_task(*args, **kwargs, countdown=settings.QUEUE_DELAY)
请注意,缓存键函数

是共享的(您希望每个函数中使用相同的缓存键)和countdown设置。

然后,执行代码的实际任务执行以下操作:

@task
def _your_task(*args, **kwargs):
    cache_key = make_public_task_cache_key(*args, **kwargs)
    counter = get_redis().getset(cache_key, 0)
    # redis makes the zero a string.
    if counter == '0':
       return
    ... execute your actual task code.

这使您可以在QUEUE_DELAY内多次击中your_public_task.delay(..),并且只会发射一次。

Bartek 有这个想法,使用原子的 Redis 计数器(如果你的代理是 Redis 应该很容易获得)。虽然他的解决方案是跳动,而不是去抖动。虽然差异很小(getset vs decr)。

将任务排队:

conn = get_redis()
conn.incr(key)
task.apply_async(args=args, kwargs=kwargs, countdown=countdown)

然后在任务中:

conn = get_redis()
counter = conn.decr(key)
if counter > 0:
    # task is still queued
    return
# continue on to rest of task
很难

使它成为装饰器,因为您需要装饰任务并调用任务本身。因此,您需要在芹菜@task装饰器之前和之后使用一个装饰器。

现在,我

只是制作了一些帮助我调用任务的函数,以及一个检查任务开始时的函数。

以下是使用Mongo的方法。

注意:我必须使设计更加宽容,因为芹菜任务不能保证在满足etacountdown用完的确切时刻执行。

此外,Mongo 过期索引仅每分钟左右清理一次;因此,您不能围绕在eta启动时删除的记录进行设计。

无论如何,流程是这样的:

  1. 客户端代码调用my_task
  2. preflight递增呼叫计数器,并将其作为flight_id
  3. _my_task设置为在 TTL 秒后执行。
  4. _my_task运行时,它会检查它是否flight_id是否仍然是最新的。如果不是,它将中止。
  5. 。过了一会儿...MONGO 通过过期索引清理集合中的过时条目。

@celery.task(track_started=False, ignore_result=True)
def my_task(my_arg):
    flight_id = preflight(inflight_collection, 'my_task', HASH(my_arg), TTL)
    _my_task.apply_async((my_arg,), {'flight_id':flight_id}, countdown=TTL)
@celery.task(track_started=False, ignore_result=True)
def _my_task(my_arg, flight_id=None):
    if not check_for_takeoff(inflight_collection, 'my_task', HASH(my_arg), flight_id):
        return
    # ... actual work ... #

图书馆代码:

TTL = 5 * 60     # Run tasks after 5 minutes
EXPIRY = 6 * TTL # This needs to be much larger than TTL. 
# We need to store a list of task-executions currently pending
inflight_collection = db['celery_In_Flight']
inflight_collection.create_index([('fn', pymongo.ASCENDING,),
                                  ('key', pymongo.ASCENDING,)])
inflight_collection.create_index('eta', expiresAfterSeconds=EXPIRY)

def preflight(collection, fn, key, ttl):
    eta = datetime.datetime.now() + datetime.timedelta(seconds=ttl)
    result = collection.find_one_and_update({
        'fn': fn,
        'key': key,
    }, {
        '$set': {
            'eta': eta
        },
        '$inc': {
            'flightId': 1
        }
    }, upsert=True, return_document=pymongo.ReturnDocument.AFTER)
    print 'Preflight[{}][{}] = {}'.format(fn, key, result['flightId'])
    return result['flightId']

def check_for_takeoff(collection, fn, key, flight_id):
    result = collection.find_one({
        'fn': fn,
        'key': key
    })
    ready = result is None or result['flightId'] == flight_id
    print 'Check[{}][{}] = {}, {}'.format(fn, key, result['flightId'], ready)
    return ready

这是我提出的解决方案:https://gist.github.com/wolever/3cf2305613052f3810a271e09d42e35c

并复制到这里,供后人参考:

import time
import redis

def get_redis_connection():
    return redis.connect()
class TaskDebouncer(object):
    """ A simple Celery task debouncer.
        Usage::
            def debounce_process_corpus(corpus):
                # Only one task with ``key`` will be allowed to execute at a
                # time. For example, if the task was resizing an image, the key
                # might be the image's URL.
                key = "process_corpus:%s" %(corpus.id, )
                TaskDebouncer.delay(
                    key, my_taks, args=[corpus.id], countdown=0,
                )
            @task(bind=True)
            def process_corpus(self, corpus_id, debounce_key=None):
                debounce = TaskDebouncer(debounce_key, keepalive=30)
                corpus = Corpus.load(corpus_id)
                try:
                    for item in corpus:
                        item.process()
                        # If ``debounce.keepalive()`` isn't called every
                        # ``keepalive`` interval (the ``keepalive=30`` in the
                        # call to ``TaskDebouncer(...)``) the task will be
                        # considered dead and another one will be allowed to
                        # start.
                        debounce.keepalive()
                finally:
                    # ``finalize()`` will mark the task as complete and allow
                    # subsequent tasks to execute. If it returns true, there
                    # was another attempt to start a task with the same key
                    # while this task was running. Depending on your business
                    # logic, this might indicate that the task should be
                    # retried.
                    needs_retry = debounce.finalize()
                if needs_retry:
                    raise self.retry(max_retries=None)
    """
    def __init__(self, key, keepalive=60):
        if key:
            self.key = key.partition("!")[0]
            self.run_key = key
        else:
            self.key = None
            self.run_key = None
        self._keepalive = keepalive
        self.cxn = get_redis_connection()
        self.init()
        self.keepalive()
    @classmethod
    def delay(cls, key, task, args=None, kwargs=None, countdown=30):
        cxn = get_redis_connection()
        now = int(time.time())
        first = cxn.set(key, now, nx=True, ex=countdown + 10)
        if not first:
            now = cxn.get(key)
        run_key = "%s!%s" %(key, now)
        if first:
            kwargs = dict(kwargs or {})
            kwargs["debounce_key"] = run_key
            task.apply_async(args=args, kwargs=kwargs, countdown=countdown)
        return (first, run_key)
    def init(self):
        self.initial = self.key and self.cxn.get(self.key)
    def keepalive(self, expire=None):
        if self.key is None:
            return
        expire = expire if expire is not None else self._keepalive
        self.cxn.expire(self.key, expire)
    def is_out_of_date(self):
        if self.key is None:
            return False
        return self.cxn.get(self.key) != self.initial
    def finalize(self):
        if self.key is None:
            return False
        with self.cxn.pipeline() as pipe:
            while True:
                try:
                    pipe.watch(self.key)
                    if pipe.get(self.key) != self.initial:
                        return True
                    pipe.multi()
                    pipe.delete(self.key)
                    pipe.execute()
                    break
                except redis.WatchError:
                    continue
        return False

这是一个基于 https://stackoverflow.com/a/28157498/4391298 的更完整的解决方案,但变成了装饰器并伸入 Kombu 连接池以重用您的 Redis 计数器。

import logging
from functools import wraps
# Not strictly required
from django.core.exceptions import ImproperlyConfigured
from django.core.cache.utils import make_template_fragment_key
from celery.utils import gen_task_name

LOGGER = logging.getLogger(__name__)

def debounced_task(**options):
    """Debounced task decorator."""
    try:
        countdown = options.pop('countdown')
    except KeyError:
        raise ImproperlyConfigured("Debounced tasks require a countdown")
    def factory(func):
        """Decorator factory."""
        try:
            name = options.pop('name')
        except KeyError:
            name = gen_task_name(app, func.__name__, func.__module__)
        @wraps(func)
        def inner(*args, **kwargs):
            """Decorated function."""
            key = make_template_fragment_key(name, [args, kwargs])
            with app.pool.acquire_channel(block=True) as (_, channel):
                depth = channel.client.decr(key)
                if depth <= 0:
                    try:
                        func(*args, **kwargs)
                    except:
                        # The task failed (or is going to retry), set the
                        # count back to where it was
                        channel.client.set(key, depth)
                        raise
                else:
                    LOGGER.debug("%s calls pending to %s",
                                depth, name)
        task = app._task_from_fun(inner, **options, name=name + '__debounced')
        @wraps(func)
        def debouncer(*args, **kwargs):
            """
            Debouncer that calls the real task.
            This is the task we are scheduling."""
            key = make_template_fragment_key(name, [args, kwargs])
            with app.pool.acquire_channel(block=True) as (_, channel):
                # Mark this key to expire after the countdown, in case our
                # task never runs or runs too many times, we want to clean
                # up our Redis to eventually resolve the issue.
                channel.client.expire(key, countdown + 10)
                depth = channel.client.incr(key)
            LOGGER.debug("Requesting %s in %i seconds (depth=%s)",
                         name, countdown, depth)
            task.si(*args, **kwargs).apply_async(countdown=countdown)
        return app._task_from_fun(debouncer, **options, name=name)
    return factory

相关内容

  • 没有找到相关文章

最新更新