我正在寻找一种方法来限制何时调用函数,但仅当输入参数不同时,即:
@app.task(rate_limit="60/s")
def api_call(user):
do_the_api_call()
for i in range(0,100):
api_call("antoine")
api_call("oscar")
因此,我希望 每秒也被调用60次我该怎么做有什么帮助吗? --编辑日期:2015年4月27日我曾尝试在任务中调用rate_limit的子任务,但它也不起作用:rate_limit总是应用于所有实例化的子任务或任务(这是合乎逻辑的)。 最好!api_call("antoine")
每秒被调用60次,api_call("oscar")
@app.task(rate_limit="60/s")
def sub_api_call(user):
do_the_api_call()
@app.task
def api_call(user):
sub_api_call(user)
for i in range(0,100):
api_call("antoine")
api_call("oscar")
更新
请参阅评论部分,以获取一个更好的方法的链接,该方法包含了这里的大部分内容,但修复了这里版本存在的乒乓球问题。这里的版本天真地重试任务。也就是说,它只是稍后再次尝试,并带有一些抖动。如果你有1000个任务都在排队,这会造成混乱,因为它们都在争夺下一个可用的位置。他们都只是乒乓球进出任务工作者,经过数百次尝试,最终才有机会被运行。
我没有做那种天真的方法,而是尝试了一种指数后退,每次任务被抑制时,它的后退时间都会比以前长一点。这个概念是可行的,但它需要存储每个任务的重试次数,这很烦人,必须集中,而且也不是最佳的,因为在等待计划任务运行时,可能会出现长时间的无活动延迟。(想象一下,一个任务被限制了第50次,必须等待一个小时,而一个限制计时器在重新安排时间后几秒钟就过期了。在这种情况下,工作人员在等待该任务运行时会空闲一个小时。)
尝试这种方法的更好方法是使用调度器,而不是简单的重试或指数回退。注释部分中链接的更新版本维护了一个基本调度程序,该调度程序知道何时重试任务。它跟踪任务被抑制的顺序,并知道下一个运行任务的窗口何时出现。因此,想象一个任务分钟的节流阀,时间线如下:
00:00:00 - Task 1 is attempted and begins running
00:00:01 - Task 2 is attempted. Oh no! It gets throttled. The current
throttle expires at 00:01:00, so it is rescheduled then.
00:00:02 - Task 3 is attempted. Oh no! It gets throttled. The current
throttle expires at 00:01:00, but something is already
scheduled then, so it is rescheduled for 00:02:00.
00:01:00 - Task 2 attempts to run again. All clear! It runs.
00:02:00 - Task 3 attempts to run again. All clear! It runs.
换句话说,根据积压工作的长度,它将在当前限制到期后重新安排任务,并且所有其他重新安排、限制的任务都有机会运行。(这花了数周时间才弄清楚。)
原始答案
我今天花了一些时间在这方面,并提出了一个很好的解决方案。所有其他解决方案都存在以下问题之一:
- 它们要求任务进行无限次重试,从而使celeni的重试机制变得无用
- 它们不会根据参数进行节流
- 它在多个工作人员或队列中失败
- 它们笨重等等
基本上,你的任务是这样完成的:
@app.task(bind=True, max_retries=10)
@throttle_task("2/s", key="domain", jitter=(2, 15))
def scrape_domain(self, domain):
do_stuff()
结果是,每个域参数将任务限制为每秒2次,随机重试抖动在2-15秒之间。key
参数是可选的,但与任务中的参数相对应。如果没有给出关键参数,它只会将任务限制到给定的速率。如果提供了它,那么油门将应用于(任务,键)二元。
另一种方法是不使用装饰器。这给了你更多的灵活性,但要靠你自己去尝试。代替上面的,你可以做:
@app.task(bind=True, max_retries=10)
def scrape_domain(self, domain):
proceed = is_rate_okay(self, "2/s", key=domain)
if proceed:
do_stuff()
else:
self.request.retries = task.request.retries - 1 # Don't count this as against max_retries.
return task.retry(countdown=random.uniform(2, 15))
我认为这与第一个例子相同。长了一点,更有分支,但更清楚地展示了它的工作原理。我希望自己能一直使用装饰师。
这一切都是通过在redis中进行统计来实现的。实现非常简单。您可以在redis中为任务创建一个密钥(以及密钥参数,如果给定),然后根据提供的时间表使redis密钥过期。如果用户将速率设置为10/m,则会为60s设置一个redis键,并且每次尝试使用正确名称的任务时都会递增该键。如果您的增量过高,请重试该任务。否则,运行它。
def parse_rate(rate: str) -> Tuple[int, int]:
"""
Given the request rate string, return a two tuple of:
<allowed number of requests>, <period of time in seconds>
(Stolen from Django Rest Framework.)
"""
num, period = rate.split("/")
num_requests = int(num)
if len(period) > 1:
# It takes the form of a 5d, or 10s, or whatever
duration_multiplier = int(period[0:-1])
duration_unit = period[-1]
else:
duration_multiplier = 1
duration_unit = period[-1]
duration_base = {"s": 1, "m": 60, "h": 3600, "d": 86400}[duration_unit]
duration = duration_base * duration_multiplier
return num_requests, duration
def throttle_task(
rate: str,
jitter: Tuple[float, float] = (1, 10),
key: Any = None,
) -> Callable:
"""A decorator for throttling tasks to a given rate.
:param rate: The maximum rate that you want your task to run. Takes the
form of '1/m', or '10/2h' or similar.
:param jitter: A tuple of the range of backoff times you want for throttled
tasks. If the task is throttled, it will wait a random amount of time
between these values before being tried again.
:param key: An argument name whose value should be used as part of the
throttle key in redis. This allows you to create per-argument throttles by
simply passing the name of the argument you wish to key on.
:return: The decorated function
"""
def decorator_func(func: Callable) -> Callable:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> Any:
# Inspect the decorated function's parameters to get the task
# itself and the value of the parameter referenced by key.
sig = inspect.signature(func)
bound_args = sig.bind(*args, **kwargs)
task = bound_args.arguments["self"]
key_value = None
if key:
try:
key_value = bound_args.arguments[key]
except KeyError:
raise KeyError(
f"Unknown parameter '{key}' in throttle_task "
f"decorator of function {task.name}. "
f"`key` parameter must match a parameter "
f"name from function signature: '{sig}'"
)
proceed = is_rate_okay(task, rate, key=key_value)
if not proceed:
logger.info(
"Throttling task %s (%s) via decorator.",
task.name,
task.request.id,
)
# Decrement the number of times the task has retried. If you
# fail to do this, it gets auto-incremented, and you'll expend
# retries during the backoff.
task.request.retries = task.request.retries - 1
return task.retry(countdown=random.uniform(*jitter))
else:
# All set. Run the task.
return func(*args, **kwargs)
return wrapper
return decorator_func
def is_rate_okay(task: Task, rate: str = "1/s", key=None) -> bool:
"""Keep a global throttle for tasks
Can be used via the `throttle_task` decorator above.
This implements the timestamp-based algorithm detailed here:
https://www.figma.com/blog/an-alternative-approach-to-rate-limiting/
Basically, you keep track of the number of requests and use the key
expiration as a reset of the counter.
So you have a rate of 5/m, and your first task comes in. You create a key:
celery_throttle:task_name = 1
celery_throttle:task_name.expires = 60
Another task comes in a few seconds later:
celery_throttle:task_name = 2
Do not update the ttl, it now has 58s remaining
And so forth, until:
celery_throttle:task_name = 6
(10s remaining)
We're over the threshold. Re-queue the task for later. 10s later:
Key expires b/c no more ttl.
Another task comes in:
celery_throttle:task_name = 1
celery_throttle:task_name.expires = 60
And so forth.
:param task: The task that is being checked
:param rate: How many times the task can be run during the time period.
Something like, 1/s, 2/h or similar.
:param key: If given, add this to the key placed in Redis for the item.
Typically, this will correspond to the value of an argument passed to the
throttled task.
:return: Whether the task should be throttled or not.
"""
key = f"celery_throttle:{task.name}{':' + str(key) if key else ''}"
r = make_redis_interface("CACHE")
num_tasks, duration = parse_rate(rate)
# Check the count in redis
count = r.get(key)
if count is None:
# No key. Set the value to 1 and set the ttl of the key.
r.set(key, 1)
r.expire(key, duration)
return True
else:
# Key found. Check it.
if int(count) <= num_tasks:
# We're OK, run it.
r.incr(key, 1)
return True
else:
return False
我认为用Celery内置的任务限制器不可能实现这一点。
假设您正在为API使用某种缓存,最好的解决方案可能是创建任务名称和参数的哈希,并将该密钥用于基于缓存的节流器。
如果你使用Redis,你可以设置一个60秒超时的锁,或者使用增量计数器来计算每分钟的调用次数。
这篇文章可能会给你一些关于Redis:对Celery任务的分布式节流的建议
https://callhub.io/distributed-rate-limiting-with-redis-and-celery/