是什么导致了concurrent.futures僵局?包含代码



我有一个concurrent.futures抓取脚本,用于低级别的东西。然而,它开始出现问题。它被卡住了,永远不会结束。

我能够将问题缩小到17个URL(从一批18k中,你可以想象这有多有趣(。这17个URL中的一个或多个URL一定发生了一些事情,导致了停滞(死锁?(,尽管我对请求和未来都使用了超时。奇怪的是,它似乎不是一个单一的URL造成的。当我运行代码时,我会得到关于哪个URL已经完成的日志。实际上完成的一批URL似乎每次都会发生变化,所以似乎没有一个URL可以作为罪魁祸首。

欢迎任何帮助。

(按原样运行函数。不要使用runBad=False,因为它需要元组列表。(

EDIT1:ProcessPoolExecutor也会发生这种情况。

第2版:问题似乎与"重试"有关。当我注释掉这三行并使用普通的requests.get时,它会毫无问题地结束。但为什么呢?这可能是由于Retry的实现方式和concurrent.futures之间的兼容性问题吗?

#    s = requests.Session()
#    retries = Retry(total=1, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], raise_on_status=False) # raise_on_status=False = místo RetryError vrátí response
#    s.mount("https://", HTTPAdapter(max_retries=retries))

EDIT3:即使是这个简单的请求也不起作用。因此,它实际上必须安装HTTPAdapter/max_retries。甚至尝试了一个没有urllib3的Retry(),只有max_retries=2。还是没用。提出了一个问题,看看我们是否遗漏了什么——https://github.com/psf/requests/issues/5538:

import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # disabled SSL warnings

HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'}
TIMEOUT = 5
s = requests.Session()
retries = Retry(total=3, backoff_factor=1, status_forcelist=[503])
s.mount("https://", HTTPAdapter(max_retries=retries))
response = s.get('https://employdentllc.com', headers=HEADERS, timeout=TIMEOUT, verify=False)

这是最初的concurrent.futures代码:

import requests
import concurrent.futures
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
from requests.exceptions import HTTPError
from requests.exceptions import SSLError
from requests.exceptions import ConnectionError
from requests.exceptions import Timeout
from requests.exceptions import TooManyRedirects
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # disabled SSL warnings
HEADERS = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36'}
TIMEOUT = 5
def getMultiRequest(url, runBad, bad_request, tTimeout):
#print("url = ", url)
s = requests.Session()
retries = Retry(total=3, backoff_factor=5, status_forcelist=[429, 500, 502, 503, 504], raise_on_status=False) # raise_on_status=False = instead of RetryError returns a response
s.mount("https://", HTTPAdapter(max_retries=retries))
if runBad == False:
try:
response = s.get(url, headers=HEADERS, timeout=tTimeout, verify=False)

# Processing stuff // some can be pretty long (Levenstein etc)

ret = (url, response.url, response.status_code, "", len(response.content), "", "", "")
except HTTPError as e:
ret = (url, "", e.response.status_code, "", 0, "", "", False)
except SSLError:
ret = (url, "", 0, "SSL certificate verification failed", 0, "", "", False)
except ConnectionError:
ret = (url, "", 0, "Cannot establish connection", 0, "", "", False)
except Timeout:
ret = (url, "", 0, "Request timed out", 0, "", "", False)
except TooManyRedirects:
ret = (url, "", 0, "Too many redirects", 0, "", "", False)
except Exception:
ret = (url, "", 0, "Undefined exception", 0, "", "", False)
return ret
else:
try:
response = s.get(url, headers=HEADERS, timeout=tTimeout, verify=False)

# Processing stuff // some can be pretty long (Levenstein etc)

ret = (url, response.url, response.status_code, "", "")
except Exception:
ret = (url, "", 0, "", "")
return ret
def getMultiRequestThreaded(urlList, runBad, logURLs, tOut):
responseList = []
if runBad == True:
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_url = {executor.submit(getMultiRequest, url, runBad, "", tOut): url for url in urlList}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result(timeout=30)
except Exception as exc:
data = (url, 0, str(type(exc)))
finally:
if logURLs == True:
print("BAD URL done: '" + url + "'.")
responseList.append(data)
else:
with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_url = {executor.submit(getMultiRequest, url[0], runBad, url[1], tOut): url for url in urlList}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future][0]
try:
data = future.result(timeout=30)
except Exception as exc:
data = (url, 0, str(type(exc)))
finally:
if logURLs == True:
print("LEGIT URL done: '" + url + "'.")
responseList.append(data)
return responseList
URLs = [
'https://www.appyhere.com/en-us',
'https://jobilant.work/da',
'https://www.iworkjobsite.com.au/jobseeker-home.htm',
'https://youtrust.jp/lp',
'https://passioneurs.net/ar',
'https://employdentllc.com',
'https://www.ivvajob.com/default/index',
'https://praceapp.com/en',
'https://www.safecook.be/en/home-en',
'https://www.ns3a.com/en',
'https://www.andjaro.com/en/home',
'https://sweatcoin.club/',
'https://www.pursuitae.com',
'https://www.jobpal.ai/en',
'https://www.clinicoin.io/en',
'https://www.tamrecruiting.com/applicant-tracking-system-software-recruitment-management-system-talent-management-software-from-the-applicant-manager',
'https://dott.one/index.html'
]
output = getMultiRequestThreaded(URLs, True, True, TIMEOUT)

我修改了程序,将所有URL添加到一个集合中,随着每个URL的提取在循环for future in concurrent.futures.as_completed(future_to_url):中完成(无论好坏(,我从集合中删除了URL,并打印出当前集合的内容。这样,当程序最终挂起时,我就会知道还有什么需要完成:它总是URLhttps://employdentllc.com和https://www.pursuitae.com.

当我尝试自己获取这些URL时,它们分别返回503个服务不可用错误。因此,当我注释掉下面的两行时,程序运行到完成。

retries = Retry(total=3, backoff_factor=5, status_forcelist=[429, 500, 502, 503, 504], raise_on_status=False) # raise_on_status=False = instead of RetryError returns a response
s.mount("https://", HTTPAdapter(max_retries=retries))

仅仅从列表中删除代码503是没有帮助的。这个规范中要么有其他错误(尽管它看起来是正确的,除了一个相当大的backoff_factor,我只是为了确保我等待的时间足够长而减少了它(,要么requestsurllib3有问题。

以下是变量output:中每个结果的打印输出

('https://www.appyhere.com/en-us', 'https://www.appyhere.com/en-us', 200, '', '')
('https://www.iworkjobsite.com.au/jobseeker-home.htm', 'https://www.iworkjobsite.com.au/jobseeker-home.htm', 200, '', '')
('https://passioneurs.net/ar', 'https://passioneurs.net/ar', 404, '', '')
('https://youtrust.jp/lp', 'https://youtrust.jp/lp', 200, '', '')
('https://jobilant.work/da', 'https://jobilant.work/da/', 200, '', '')
('https://employdentllc.com', 'https://employdentllc.com/', 503, '', '')
('https://www.ivvajob.com/default/index', 'https://www.ivvajob.com/default/index', 200, '', '')
('https://www.ns3a.com/en', 'https://www.ns3a.com/en', 200, '', '')
('https://www.safecook.be/en/home-en', 'https://www.safecook.be/en/home-en/', 200, '', '')
('https://sweatcoin.club/', 'https://sweatcoin.club/', 200, '', '')
('https://www.andjaro.com/en/home', 'https://www.andjaro.com/en/home/', 200, '', '')
('https://praceapp.com/en', 'https://praceapp.com/en/', 200, '', '')
('https://www.clinicoin.io/en', 'https://www.clinicoin.io/en', 200, '', '')
('https://www.jobpal.ai/en', 'https://www.jobpal.ai/en/', 200, '', '')
('https://dott.one/index.html', 'https://dott.one/index.html', 200, '', '')
('https://www.tamrecruiting.com/applicant-tracking-system-software-recruitment-management-system-talent-management-software-from-the-applicant-manager', 'https://www.tamrecruiting.com/applicant-tracking-system-software-recruitment-management-system-talent-management-software-from-the-applicant-manager', 404, '', '')
('https://www.pursuitae.com', 'https://www.pursuitae.com/', 503, '', '')

更新

我发现了问题。您需要respect_retry_after_header=False参数:

retries = Retry(total=3, backoff_factor=5, status_forcelist=[429, 500, 502, 503, 504], raise_on_status=False, respect_retry_after_header=False) # raise_on_status=False = instead of RetryError returns a response

您可能还希望将backoff_factor减小到1。

现在,这似乎是挂起的"重试python请求"模块的重复。

我能够重现死锁,我不确定为什么会发生,但使用multiprocessing.ThreadPool()不会发生:

from multiprocessing.pool import ThreadPool
import requests
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/74.0.3729.169 Safari/537.36"
}
sess = requests.Session()

def do_request(job):
(sess, url, timeout) = job  # no `istarmap_unordered`...
try:
response = sess.get(url, headers=HEADERS, timeout=timeout, verify=False)
return (url, response.url, response.status_code, "", "")
except Exception as exc:
return (url, "", 0, str(exc), "")

def get_responses_threaded(url_list):
with ThreadPool() as p, requests.Session() as sess:
jobs = [(sess, url, 5) for url in url_list]  # no `istarmap_unordered`...
yield from p.imap_unordered(do_request, jobs)

urls = [
# ...
]
for resp in get_responses_threaded(urls):
print(resp)

最新更新