在芹菜中有什么方法可以定义任务配额吗?



我有需求:

  1. 我有一些重资源消耗的任务-导出不同的报告,需要大而复杂的查询,子查询
  2. 用户很多。
  3. 我已经在django中构建了一个项目,并使用芹菜队列任务
  4. 我想限制用户,以便他们每分钟可以请求10个报告。这个想法是他们可以在10分钟内发出数百个请求,但我想让芹菜为一个用户执行10个任务。这样每个用户都有机会。

有什么方法可以让芹菜做到这一点吗?

谢谢

芹菜有一个控制RATE_LIMIT (http://celery.readthedocs.org/en/latest/userguide/tasks.html#Task.rate_limit)的设置,它意味着在一个时间框架内可以运行的任务数量。你可以将其设置为"100/m"(每秒100个),这意味着你的系统允许每秒执行100个任务,需要注意的是,这个设置不是针对用户的,也不是针对任务的,而是针对时间框架的。您是否考虑过这种方法而不是限制每个用户?

为了让每个任务和用户对都有一个'rate_limit',你必须这样做。我认为(不确定)你可以根据自己的需要使用TaskRouter或信号。TaskRouters (http://celery.readthedocs.org/en/latest/userguide/routing.html#routers)允许应用一些逻辑将任务路由到指定队列。信号(http://celery.readthedocs.org/en/latest/userguide/signals.html)允许在任务调度周期的几个明确定义的点上执行代码。

Router逻辑的一个例子可以是:
if task == 'A':
    user_id = args[0]  # in this task the user_id is the first arg
    qty = get_task_qty('A', user_id)
    if qty > LIMIT_FOR_A:
        return
elif task == 'B':
    user_id = args[2]  # in this task the user_id is the seconds arg
    qty = get_task_qty('B', user_id)
    if qty > LIMIT_FOR_B:
        return
return {'queue': 'default'}

使用上面的方法,每次任务启动时,你应该在某些地方(例如Redis)增加user_id/task_type和每次任务完成时,您应该在相同的位置减少该值。

它看起来有点复杂,很难维护,对我来说故障点很少。

我认为另一种适合的方法是为每个用户和任务实现某种"分布式信号量"(类似于分布式锁),因此在每个需要限制任务运行数量的任务中可以使用它。

这个想法是,每次一个应该有"并发控制"的任务启动时,如果不只是返回,它必须检查是否有一些可用的资源。

你可以这样想:

@shared_task
def my_task_A(user_id, arg1, arg2):
    resource_key = 'my_task_A_{}'.format(user_id)
    available = SemaphoreManager.is_available_resource(resource_key)
    if not available:
        # no resources then abort
        return
    try:
        # the resourse could be acquired just before us for other
        if SemaphoreManager.acquire(resource_key):
            #execute your code
    finally:
        SemaphoreManager.release(resource_key)

很难说你应该采用哪种方法,因为这取决于你的应用程序。

希望它能帮助你!

祝你好运!

相关内容

  • 没有找到相关文章

最新更新