Python 使用多线程处理动态 URL



我是Python线程的新手,我浏览了多篇文章,但我真的不明白如何使用它。但是,我试图完成我的任务,我想检查我是否以正确的方法完成。

任务是:读取包含大约 20K 条记录的大型 CSV,从每条记录中获取 ID,并为 CSV 的每条记录触发 HTTP API 调用。

t1 = time.time()
file_data_obj = csv.DictReader(open(file_path, 'rU')) 
threads = []
for record in file_data_obj:
      apiurl = https://www.api-server.com?id=record.get("acc_id", "")
      thread = threading.Thread(target=requests.get, args=(apiurl,))
      thread.start()
      threads.append(thread)
t2 = time.time()
for thread in threads:
    thread.join()
print("Total time required to process a file - {} Secs".format(t2-t1))
  • 由于有 20K 条记录,它会启动 20K 线程吗?或者OS/Python会处理它?如果是,我们可以限制它吗?
  • 如何收集requests.get返回的响应?
  • t2 - t1 真的会给 mw 处理整个文件所需的时间吗?

由于有 20K 条记录,它会启动 20K 线程吗?或者OS/Python会处理它?如果是,我们可以限制它吗?

是的 - 它将为每次迭代启动一个线程。最大线程数取决于您的OS

如何获取 requests.get 返回的响应?

如果您只想使用 threading 模块,则必须使用 Queue . Threads在设计上返回None,因此您必须在Thread和您之间实现一条通信线路,main自己循环。

from queue import Queue
from threading import Thread
import time
# A thread that produces data
q = Queue()

def return_get(q, apiurl):
    q.put(requests.get(apiurl)
for record in file_data_obj:
    apiurl = https://www.api-server.com?id=record.get("acc_id", "")
    t = threading.Thread(target=return_get, args=(q, apiurl))
    t.start()
    threads.append(t)
for thread in threads:
    thread.join()
while not q.empty:
    r = q.get()  # Fetches the first item on the queue
    print(r.text)

另一种方法是使用辅助角色池。

from concurrent.futures import ThreadPoolExecutor
from queue import Queue
import urllib.request
threads = []
pool = ThreadPoolExecutor(10)
# Submit work to the pool
for record in file_data_obj:
    apiurl = https://www.api-server.com?id=record.get("acc_id", "")
    t = pool.submit(fetch_url, 'http://www.python.org')
    threads.append(t)
for t in threads:
    print(t.result())

您可以使用 ThreadPoolExecutor

检索单个页面并报告 URL 和内容

def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

创建具有 N 个辅助角色的池执行程序

with concurrent.futures.ThreadPoolExecutor(max_workers=N_workers) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

最新更新