我有一个Python函数,它通过API请求数据,并涉及一个旋转的过期键。请求的数量需要对函数进行某种程度的并行化。我正在用多处理来做这件事。pool模块ThreadPool。示例代码:
import requests
from multiprocessing.pool import ThreadPool
from tqdm import tqdm
# Input is a list-of-dicts results of a previous process.
results = [...]
# Process starts by retrieving an authorization key.
headers = {"authorization": get_new_authorization()}
# api_call() is called on each existing result with the retrieved key.
results = thread(api_call, [(headers, result) for result in results])
# Function calls API with passed headers for given URL and returns dict.
def api_call(headers_plus_result):
headers, result = headers_plus_result
r = requests.get(result["url"]), headers=headers)
return json.loads(r.text)
# Threading function with default num_threads.
def thread(worker, jobs, num_threads=5):
pool = ThreadPool(num_threads)
results = list()
for result in tqdm(pool.imap_unordered(worker, jobs), total=len(jobs)):
if result:
results.append(result)
pool.close()
pool.join()
if results:
return results
# Function to get new authorization key.
def get_new_authorization():
...
return auth_key
我试图修改我的映射进程,以便,当第一个工作者失败时(即授权密钥过期),所有其他进程暂停,直到检索到新的授权密钥。然后,进程继续使用新密钥。
应该插入到实际的thread()函数中吗?如果我在api_call函数本身中放置一个异常,我不知道如何阻止池管理器或更新传递给其他工作者的头。
另外:如果我想要这种灵活性,使用ThreadPool是最好的方法吗?
一种更简单的可能性是使用multiprocessing.Event
和共享变量。事件将指示身份验证是否合法,共享变量将包含身份验证。
event = mp.Event()
sharedAuthentication = mp.Array('u', 100) # 100 = max length
所以worker会运行:
event.wait();
authentication = sharedAuthentication.value
你的主线程最初会用
设置身份验证sharedAuthentication.value = ....
event.set()
然后用
修改身份验证event.clear()
... calculate new authentication
sharedAuthentication.value = .....
event.set()