concurrent futures threadpoolexecutor重复列表项



我是Python中多线程的新手,并试图管理调用数千个api的脚本。我读了很多答案和文章,得出了这个结论:

import requests
import json
import time
import sys
import concurrent.futures
import threading

thread_local = threading.local()
pet_data = []

def get_session():
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session

def scrape(url):
session = get_session()
with session.get(url) as response:
info = response.text
pet_data.append(json.loads(info))

def run_scrapes(sites):
with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor:
executor.map(scrape, sites)
executor.shutdown(wait=True)

sites是要调用的url的列表(它是一个分页API,所以它是一个简单的' API .endpoint&page='+str(i) urls '的列表)。

它工作得很好,但我遇到的问题是它重复调用,很多次(基于通过日志调试,每个URL的6被调用,即使在列表中只有1个)。

我从文章/答案中邮寄过来的代码中有什么问题吗?我承认我没有完全理解get_session函数,我想这可能是问题所在。

在如何使用线程的本地数据对象方面存在一个根本性的缺陷。该对象是线程本地的(因此得名),需要每个线程获取。像你这样从主线程中获取它,然后在get_session()中重用它,将在线程之间共享相同的Session对象。你需要总是在线程中获取它:

def get_session():
thread_local = threading.local()
if not hasattr(thread_local, "session"):
thread_local.session = requests.Session()
return thread_local.session

我不清楚Session对象是否线程安全(那里有冲突的信息)。这可能是你的问题的原因。

您在收集结果时也错误地使用了范式。您应该做的是从scrape()返回相应的数据。然后通过map()呼叫收取:

pet_data = list(executor.map(scrape, sites))

这部分并不重要。

附录

你的主要问题是,你没有使用正确的工具来完成这项工作。对于像HTTP请求这样的异步I/O,你应该使用Python的asyncio功能,例如使用httpx: https://www.python-httpx.org/async/

这样所有的请求都可以并发运行,即使你从一个线程启动它们。由于GIL的原因,多线程在很多情况下对Python没有多大帮助。

最新更新