我有一个非常直接的代码,如果id来自一个文件,我会加载一个列表,然后遍历列表中的每个id,并调用一个api,在那里我传递id值,并将api响应内容转储到一个文件中。
我想通过并行api调用来加快这个过程,但是api服务器每秒最多只允许5次调用。另一个关键的考虑因素是api调用速度较慢,平均每次调用需要10秒才能完成。
我希望能够有多个并行进程,它们有某种方式可以确保在一秒钟内最多发生5次调用。
这是当前代码:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
def dump_data(df,idx):
filename = base_dir+'\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(idx):
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df,idx)
Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(idx) for idx in ids)
我目前正在使用joblib,但如果这个解决方案有更好的库,可以使用它。
我如何确保在任何一秒发出的请求都不会超过5个?(同时尽可能快地完成所有请求)
此外,我在Windows 上使用Python 3.9
更新2
在进一步思考之后,根据您的需要,使用标准多线程池或多处理池,然后将CallingThrottle
实例传递给辅助函数(间接地作为全局或显式地作为参数),该实例的throttle
方法可以在需要进行节流的处理的精确点由辅助函数直接调用(在向网站提出请求之前)。将throttle实例作为参数直接传递给辅助函数应该可以将其与joblib
一起使用(但我认为在您的情况下,您只需要一个多线程池)。
例如:
from multiprocessing.pool import ThreadPool, Pool
from multiprocessing.managers import BaseManager
from threading import Lock
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
self.lock = Lock()
def throttle(self):
with self.lock:
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
pass
CallingThrottleManager.register('CallingThrottle', CallingThrottle)
def init_pool(throttle):
global calling_throttle
calling_throttle = throttle
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 5 of these per second.
"""
from datetime import datetime
calling_throttle.throttle()
print(datetime.now(), 'x =', x)
time.sleep(10)
return x, x * x
def main():
# Multithreading example:
calling_throttle = CallingThrottle(5, 1) # 5 calls every 1 second
pool = ThreadPool(20)
init_pool(calling_throttle)
start = time.time()
results = pool.map(worker, range(20))
print('Total elapsed time:', time.time() - start)
pool.close()
pool.join()
print('n', '-' * 30, 'n', sep='')
# Multiprocessing example:
with CallingThrottleManager() as manager:
calling_throttle = manager.CallingThrottle(5, 1) # 5 calls every 1 second
pool = Pool(20, initializer=init_pool, initargs=(calling_throttle,))
start = time.time()
results = pool.map(worker, range(20))
print('Total elapsed time:', time.time() - start)
pool.close()
pool.join()
if __name__ == '__main__':
main()
使用带有joblib
:的油门
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
from multiprocessing.managers import BaseManager
from threading import Lock
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
self.lock = Lock()
def throttle(self):
with self.lock:
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
# A "managed" CallingThrottle is required for use with multiprocessing:
class CallingThrottleManager(BaseManager):
pass
def dump_data(df, idx):
filename = base_dir+'\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(calling_throttle, idx):
calling_throttle.throttle()
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df, idx)
def main():
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
CallingThrottleManager.register('CallingThrottle', CallingThrottle)
with CallingThrottleManager() as manager:
calling_throttle = manager.CallingThrottle()
Parallel(n_jobs=10, verbose = 50)(delayed(get_api)(calling_throttle, idx) for idx in ids)
if __name__ == '__main__':
main()
更新1
我最初实现了@balmy评论中提到的速率限制算法,人们注意到有时会超过速率。@mindvirus评论了这一现象,OP在8秒内尝试发送5条消息:
这很好,但可能会超过速率。假设在时间0转发5条消息,然后在时间N*(8/5)转发N=1,2。。。您可以发送另一条消息,在8秒内发送5条以上的消息。
所以我现在使用一种新的速率限制算法。
我已经创建了两个类,RateLimitedProcessPool
和RateLimitedThreadPool
,分别用于多线程处理和多线程处理?。这些类类似于标准的mulitprocessing.pool.Pool
和multiprocessing.pool.ThreadPool
类,除了__init__
方法采用两个额外的关键字参数rate和per,这两个参数共同指定了每秒可以调用apply_async
方法的最大速率。例如,值rate=7和per=3意味着对apply_async
的连续调用将被节流,以便每3秒只允许7次调用的最大速率。
下面的代码用一个简单的辅助函数来演示这一点,该函数模拟OP的情况,即辅助函数需要10秒才能执行,并且必须限制为每秒5次调用的最大速率。我们需要调用这个函数20次,因此我们能达到的最佳性能是大约13秒的总运行时间。
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time
class CallingThrottle:
def __init__(self, nb_call_times_limit, expired_time):
self.nb_call_times_limit = nb_call_times_limit
self.expired_time = expired_time
self.called_timestamps = list()
def __enter__(self):
while len(self.called_timestamps) == self.nb_call_times_limit:
now = time.time()
self.called_timestamps = list(filter(
lambda x: now - x < self.expired_time,
self.called_timestamps
))
if len(self.called_timestamps) == self.nb_call_times_limit:
time_to_sleep = self.called_timestamps[0] + self.expired_time - now
time.sleep(time_to_sleep)
self.called_timestamps.append(time.time())
def __exit__(self, *exc):
pass
class RateLimitedPool:
def __init__(self, rate, per):
self.calling_throttle = CallingThrottle(rate, per)
self.first_time = True
def apply_async(self, *args, **kwargs):
# There could be a lag between the first call to apply_async
# and the first task actually starting, so set the first time
# after the call to apply_async:
if self.first_time:
self.first_time = False
async_result = super().apply_async(*args, **kwargs)
with self.calling_throttle:
pass
return async_result
else:
with self.calling_throttle:
return super().apply_async(*args, **kwargs)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
########################################
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 5 of these per second.
"""
from datetime import datetime
print(datetime.now(), 'x =', x)
time.sleep(10)
return x, x * x
def main():
args = range(20)
pool = RateLimitedThreadPool(20, rate=5, per=1) # 5 per second
start = time.time()
for x in args:
pool.apply_async(worker, args=(x,))
# Wait for all tasks to complete
pool.close()
pool.join()
print('Total elapsed time:', time.time() - start)
if __name__ == '__main__':
main()
打印:
2021-10-03 07:19:48.002628 x = 0
2021-10-03 07:19:48.002628 x = 1
2021-10-03 07:19:48.002628 x = 3
2021-10-03 07:19:48.002628 x = 4
2021-10-03 07:19:48.002628 x = 2
2021-10-03 07:19:49.005625 x = 5
2021-10-03 07:19:49.005625 x = 6
2021-10-03 07:19:49.005625 x = 8
2021-10-03 07:19:49.005625 x = 7
2021-10-03 07:19:49.005625 x = 9
2021-10-03 07:19:50.008775 x = 10
2021-10-03 07:19:50.008775 x = 11
2021-10-03 07:19:50.008775 x = 13
2021-10-03 07:19:50.008775 x = 12
2021-10-03 07:19:50.008775 x = 14
2021-10-03 07:19:51.012774 x = 15
2021-10-03 07:19:51.012774 x = 16
2021-10-03 07:19:51.012774 x = 17
2021-10-03 07:19:51.012774 x = 18
2021-10-03 07:19:51.012774 x = 19
Total elapsed time: 13.015560150146484
CPU密集型示例
在下面的例子中,我使用的是RateLimitedProcessPool
,因为我的工作函数是100%CPU,在桌面上执行大约需要10秒。我只有8个逻辑核心(4个物理核心),所以我的池大小是8,对于这个演示,我以每秒3个任务的速度提交了8个任务。后3个任务将在前3个任务之后大约1秒开始,接下来的2个任务将在此之后1秒开始。因为物理内核的数量成为了一个限制因素,所以总运行时间略高于21秒。
import multiprocessing.pool
import multiprocessing
import threading
from functools import wraps
import time
class RateLimitedPool:
# There is an a lag between the first call to apply_async and the first task actually starting:
LAG_TIME = .2 # seconds - needs to be fine-tuned:
def __init__(self, rate, per):
assert isinstance(rate, int) and rate > 0
assert isinstance(per, (int, float)) and per > 0
self.rate = rate
self.per = per
self.count = 0
self.start_time = None
self.first_time = True
def _check_allowed(self):
current_time = time.time()
if self.start_time is None:
self.start_time = current_time
self.count = 1
return True
elapsed_time = current_time - self.start_time
if self.first_time:
elapsed_time -= self.LAG_TIME
if elapsed_time >= self.per:
self.start_time = current_time
self.count = 1
self.first_time = False
return True
if self.count < self.rate:
self.count += 1
return True
return False
def apply_async(self, *args, **kwargs):
while not self._check_allowed():
time.sleep(.1) # This can be fine-tuned
return super().apply_async(*args, **kwargs)
class RateLimitedProcessPool(RateLimitedPool, multiprocessing.pool.Pool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
class RateLimitedThreadPool(RateLimitedPool, multiprocessing.pool.ThreadPool):
def __init__(self, *args, rate=5, per=1, **kwargs):
multiprocessing.pool.Pool.__init__(self, *args, **kwargs)
RateLimitedPool.__init__(self, rate, per)
def threadpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
def processpool(pool):
def decorate(f):
@wraps(f)
def wrap(*args, **kwargs):
return pool.apply_async(f, args, kwargs)
return wrap
return decorate
########################################
ONE_SECOND_ITERATIONS = 20_000_000
def one_second():
sum = 0
for _ in range(ONE_SECOND_ITERATIONS):
sum += 1
return sum
def worker(x):
"""
Emulate a task that takes 10 seconds to execute.
Cannot run more than 3 of these per second.
"""
from datetime import datetime
print(datetime.now(), 'x = ', x)
for _ in range(10):
one_second()
return x, x * x
def main():
args = range(8)
pool = RateLimitedProcessPool(8, rate=3, per=1) # 3 per second
start = time.time()
for x in args:
pool.apply_async(worker, args=(x,))
# Wait for all tasks to complete
pool.close()
pool.join()
print('Total elapsed time:', time.time() - start)
if __name__ == '__main__':
main()
打印:
2021-10-03 09:51:32.857166 x = 0
2021-10-03 09:51:32.859168 x = 1
2021-10-03 09:51:32.864166 x = 2
2021-10-03 09:51:33.899890 x = 5
2021-10-03 09:51:33.899890 x = 3
2021-10-03 09:51:33.907888 x = 4
2021-10-03 09:51:34.924889 x = 6
2021-10-03 09:51:34.925888 x = 7
Total elapsed time: 21.22123622894287
可以使用concurrent.futures库和一个一次运行5个线程的循环,我将工作线程的数量限制为50个,但这可能会更高,因为线程没有执行任何cpu密集型任务。有了最后一个sleep(1)语句,你就可以保证每秒绕过或不到5个呼叫(因为循环处理时间)
import pandas as pd
import numpy as np
import concurrent.futures, time
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
def dump_data(df,idx):
filename = base_dir+'\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(idx):
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df,idx)
N = 5 # number of requests per second
L = 10 # latency per request
with concurrent.futures.ThreadPoolExecutor(max_workers=N*L) as ex:
# reshape IDs into a list of lists of N items each and loop over them
for id_list in ((ids[i] for i in range(j*N,min(j*N+N,len(ids))) ) for j in range(round(len(ids)/N+0.5)) )
# wait if there are items on the work queue
while ex._work_queue.qsize()>0: time.sleep(1)
ex.map(get_api,id_list)
time.sleep(1)
你可以对这样的东西非常感兴趣,但我怀疑你会得到比愚蠢简单的东西更好的性能,比如只需五分之一秒的延迟就可以将每个任务提交到处理池。如果你的api调用释放了GIL,这将是值得测试的,因为每个活动调用使用1个线程(在飞行中)比每个活动调用1个进程占用的资源要少得多(尽管它仍然可以…硬件很便宜,对吧?)。
我可以看到的一个潜在问题是,如果一批特定的调用需要很长时间,并且池的输入队列开始填满,那么一旦慢速任务完成,您可能会一次提交太多。解决这一问题的一个简单解决方案是为处理池提供比实际需要更多的工作人员。如果平均通话持续10秒,而你每秒提交5次,那么你需要50名员工。你可以很容易地给池提供100个工人,只要api调用不是太占用cpu,这只会导致ram效率低下。
您可能遇到的另一个潜在问题是,端点可能只允许一定数量的同时连接,这有时可能小于您想要的飞行中的呼叫数量。对此的解决方案是将池工作程序的数量限制为最大连接数,但这与早期问题的解决方案相冲突。
以下是我解决这两个问题的方法:给池工作人员一个大小有限的输入队列,这样主线程就不会在输入队列中堆积一堆作业。我会这样做:
import pandas as pd
from multiprocessing import Process, Queue
from time import sleep
def dump_data(df,idx):
filename = base_dir+'\'+str(idx)+'.csv'
data.to_csv(filename, header= True, index=False) #write data to file
def get_api(idx):
data = call_some_api(idx) #api returns data as pandas dataframe, take about 10 secs
dump_data(df,idx)
class STOPFLAG: pass
def worker(in_q):
while True:
task = in_q.get()
if isinstance(task, STOPFLAG):
return
get_api(task)
def main():
# int: n tasks per float: t seconds
n = 5 #tasks per interval
t = 1.0 #interval in seconds
max_in_flight = 20 #limited to 20 concurrent connections
ids = pd.read_csv('data.csv')
ids = ids['Id'].values.tolist()
work_q = Queue(maxsize=1) #maxsize will cause q.put() to block until it's got space to put something
pool = [Process(target=worker, args=(work_q,)) for _ in range(max_in_flight)]
for p in pool: p.start()
for idx in ids:
work_q.put(idx)
sleep(t/n) #dumb but effective rate limit
for p in pool: work_q.put(STOPFLAG())
for p in pool: p.join() #I p in pools :P
if __name__ == "__main__":
main()
如果你的api端点返回速率限制头,你可以很容易地修改代码,不断更新工作线程的一些共享值,这样主线程就可以更智能地决定何时(以及多少)启动任务:
from multiprocessing import Value
def worker(in_q, Value: limit, Value: remaining, Value: reset):
while True:
task = in_q.get()
if isinstance(task, STOPFLAG):
return
response = get_api(task)
if "X-Rate-Limit-Limit" in response.headers:
with limit.get_lock(): #beyond 3.5 you can just call "with limit:" to acquire the lock for synchronization
limit.value = response.headers["X-Rate-Limit-Limit"]
# same for "X-Rate-Limit-Remaining" and "X-Rate-Limit-Reset"
#then in main:
#...
for idx in ids:
#get most recent number of remaining connections allowed:
with remaining.get_lock():
n = remaining.value
if n > 0:
work_q.put(idx)
else:
with reset.get_lock():
sleeptime = reset.value - time.time()
sleep(sleeptime) #maybe add an extra bit of sleep time to account for possible clock mismatch
实现这一点的唯一合理准确的方法是在辅助函数即将调用需要速率限制的API时执行速率限制算法。当您在任务提交时尝试这样做时,问题就变成了您无法控制任务的最终调度方式,也无法再保证每秒API调用的数量。但是,将算法移到每个进程意味着您现在必须使用带有锁定的共享内存中的值,以确保一切正常工作。
这是通过创建两个共享内存值的RateLimiter
类实现的,一个用于保存在当前间隔中进行的API调用的数量,另一个用于保持当前间隔过期的时间。池中的每个进程都用一个全局变量初始化,该全局变量是这个RateInstance
类的实例,该类包含对这些共享变量的引用:
from multiprocessing import Pool, Lock, Value
import time
class RateLimiter:
FUDGE_FACTOR = .0015 # due to imprecision of timer: 15ms.
def __init__(self, rate, per):
assert isinstance(rate, int) and rate > 0
assert isinstance(per, (int, float)) and per > 0
self._rate = rate
self._per = per
self._lock = Lock()
self._count = Value('i', False) # 0
self._expiration = Value('d', False) # 0.0
def __call__(self, api_func, *args, **kwargs):
with self._lock:
now = time.time()
if now >= self._expiration.value:
self._expiration.value = now + self._per + self.FUDGE_FACTOR
self._count.value = 1
elif self._count.value < self._rate:
self._count.value += 1
else:
time.sleep(self._expiration.value - now)
self._expiration.value = time.time() + self._per + self.FUDGE_FACTOR
self._count.value = 1
return api_func(*args, **kwargs)
def init_pool(the_rate_limiter):
global rate_limiter
rate_limiter = the_rate_limiter
def api_func(x):
from datetime import datetime
print(datetime.now(), 'x = ', x)
time.sleep(10 - x/3)
return x * x
def worker(x):
return rate_limiter(api_func, x)
def main():
# 5 api calls per second
rate_limiter = RateLimiter(5, 1.0)
pool = Pool(20, initializer=init_pool, initargs=(rate_limiter,))
results = pool.map(worker, range(20))
pool.close()
pool.join()
print(results)
if __name__ == "__main__":
main()
打印:
2021-10-04 07:12:32.148725 x = 0
2021-10-04 07:12:32.171724 x = 3
2021-10-04 07:12:32.172724 x = 2
2021-10-04 07:12:32.173724 x = 1
2021-10-04 07:12:32.175724 x = 4
2021-10-04 07:12:33.151015 x = 7
2021-10-04 07:12:33.151015 x = 8
2021-10-04 07:12:33.151015 x = 5
2021-10-04 07:12:33.151015 x = 9
2021-10-04 07:12:33.152014 x = 6
2021-10-04 07:12:34.154015 x = 14
2021-10-04 07:12:34.154015 x = 13
2021-10-04 07:12:34.154015 x = 10
2021-10-04 07:12:34.155014 x = 12
2021-10-04 07:12:34.156015 x = 11
2021-10-04 07:12:35.155018 x = 16
2021-10-04 07:12:35.156014 x = 19
2021-10-04 07:12:35.156014 x = 17
2021-10-04 07:12:35.156014 x = 15
2021-10-04 07:12:35.157015 x = 18
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361]