多线程未处理完整列表



我正在使用多线程访问从csv读取的链接,奇怪的是,无论max-workers是什么,甚至当我删除多线程部分时,代码运行的URL数量都比列表中的要少。我打印列表以验证计数。例如,如果列表有5000个URL,代码将在4084处停止,如果链接为13000,则代码将在9200处停止,即使只有130个链接,代码也将在80处停止。我在这里做错了什么?

import requests
import xlrd
import concurrent.futures
from bs4 import BeautifulSoup
import csv

header_added = False
file_location = "Urls.xlsx"
workbook = xlrd.open_workbook(file_location)
sheet = workbook.sheet_by_index(0)
all_links = []
for row in range(1, 11000):
all_links.append(sheet.cell_value(row,0))
print(len(all_links))
i = 0
def get_solution(url):
global header_added, i
page = requests.get(url).text
soup = BeautifulSoup(page, 'html.parser')
ques_div = soup.find('p', class_='header-description')
ques = ques_div.find('span').text
ans_divs = soup.findAll('div', class_='puzzle-solution')
ans = ans_divs[0].text
print("Solution ", i)
i += 1
dict1 ={"Words": ques, "Solution": ans}
with open('Results10k.csv', 'a+', encoding='utf-8') as f:
w = csv.DictWriter(f, dict1.keys())
if not header_added:
w.writeheader()
header_added = True
w.writerow(dict1)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result = executor.map(get_solution, all_links)

这是对代码的重新设计,不需要锁——相反,只有一个进程可以写入文件。

此外,由于GIL的原因,使用ThreadPool将比使用进程支持的Pool慢。

import csv
import multiprocessing
import requests
import xlrd
from bs4 import BeautifulSoup
sess = requests.Session()

def get_solution(url):
try:
resp = sess.get(url)
resp.raise_for_status()
page = resp.text
soup = BeautifulSoup(page, "html.parser")
ques_div = soup.find("p", class_="header-description")
ques = ques_div.find("span").text.strip()
ans_divs = soup.findAll("div", class_="puzzle-solution")
ans = ans_divs[0].text.strip()
return {"URL": url, "Words": ques, "Solution": ans, "Error": ""}
except Exception as exc:
print(url, "Error:", exc)
return {"URL": url, "Words": "", "Solution": "", "Error": str(exc)}

def read_links(file_location):
workbook = xlrd.open_workbook(file_location)
sheet = workbook.sheet_by_index(0)
all_links = []
for row in range(1, 11000):
all_links.append(sheet.cell_value(row, 0))
return all_links

def main():
links = read_links("./Urls.xlsx")
with open("Results10k.csv", "w", encoding="utf-8") as f:
with multiprocessing.Pool() as p:  # (or multiprocessing.pool.ThreadPool)
for i, result in enumerate(p.imap_unordered(get_solution, links, chunksize=16)):
if i == 0:
writer = csv.DictWriter(f, result.keys())
writer.writeheader()
writer.writerow(result)
f.flush()  # Ensure changes are written immediately
if i % 100 == 0:  # Progress indicator
print(i)

if __name__ == "__main__":
main()

get_solution()可能会因某些URL而崩溃。您可以在函数体中添加try/except,并将所有崩溃的URLS写入不同的文件。

def get_solution(url):
try:
...
except:
with open('errors.txt','a+') as f:
f.write(url+'n')

如果是这个问题,那么这些数字应该加起来就是总数。此外,open()可能不是线程安全的。

file_lock = threading.Lock()
def get_solution(url):
with file_lock:
with open('Results10k.csv', 'a+', encoding='utf-8') as f:
w = csv.DictWriter(f, dict1.keys())
...

相关内容

最新更新