无法修改函数以坚持一个工作代理



我用Python编写了一个脚本,利用代理和多处理,同时向某些链接发送请求,以便从那里解析产品名称。我目前的尝试错误地完成了这项工作,但它通过在每个调用中尝试使用三个新代理来减慢进程,而不管正在运行的代理是好是坏。

由于我使用了多处理,就像脚本中的multiprocessing.dummy一样,我希望以这样一种方式修改parse_product_info()函数,这样即使代理被标识为坏代理,它也不会多次调用process_proxy()函数来生成三个新代理。更清楚的是 - 通过我目前的尝试,无论正在运行的代理是好是坏,我可以看到,当链接在parse_product_info(link)中使用时,每个调用中都会有三个新代理发挥作用,因为我在Pool()中使用了3

我试过:

import random
import requests
from bs4 import BeautifulSoup
from multiprocessing.dummy import Pool
linklist = [
'https://www.amazon.com/dp/B00OI0RGGO', 
'https://www.amazon.com/dp/B00TPKOPWA', 
'https://www.amazon.com/dp/B00TH42HWE', 
'https://www.amazon.com/dp/B00TPKNREM', 
]
def process_proxy():
global proxyVault
if len(proxyVault)!=0:
random.shuffle(proxyVault)
proxy_url = proxyVault.pop()
proxy = {'https': f'http://{proxy_url}'}
else:
proxy = None
return proxy

def parse_product_info(link):
global proxy
try:
if not proxy:raise #if proxy variable doesn't contain any proxy yet, it goes to the exception block to get one as long as the proxy list is not empty
print("proxy to be used:",proxy)
res = requests.get(link,proxies=proxy,timeout=5)
soup = BeautifulSoup(res.text,"html5lib")
try:
product_name = soup.select_one("#productTitle").get_text(strip=True)
except Exception: product_name = ""
print(link,product_name)
except Exception:
proxy = process_proxy()
if proxy!=None:
return parse_product_info(link)
else:
pass

if __name__ == '__main__':
proxyVault = ['103.110.37.244:36022', '180.254.218.229:8080', '110.74.197.207:50632', '1.20.101.95:49001', '200.10.193.90:8080', '173.164.26.117:3128', '103.228.118.66:43002', '178.128.231.201:3128', '1.2.169.54:55312', '181.52.85.249:31487', '97.64.135.4:8080', '190.96.214.123:53251', '52.144.107.142:31923', '45.5.224.145:52035', '89.218.22.178:8080', '192.241.143.186:80', '113.53.29.218:38310', '36.78.131.182:39243']
pool = Pool(3)
pool.map(parse_product_info,linklist)

我如何以这种方式修改parse_product_info()功能,以便它在工作时会坚持到一个代理?

首先,尽管使用了multiprocessing-module,但您在这里使用的是多线程,因为.dummy使用线程而不是进程。

我最初认为 OP 可以与多线程一起使用,因为示例中没有迹象表明 CPU 密集型工作,但由于我们现在知道 OP 真的可能想要使用多处理,所以我还提供了多处理解决方案。

OP 的示例需要对整个代理处理的同步进行返工。我通过"模拟"请求部分并删除汤部分来简化示例,因为它对问题并不重要。


多处理

此解决方案使用multiprocessing.Value作为共享计数器,用于索引到代理列表。如果工作线程遇到超时,则会增加共享索引。共享计数器和代理列表在Pool'sinitializer-parameter 的帮助下在(worker-(进程启动时注册一次。

对非静态共享资源的任何非原子操作使用锁非常重要。 默认情况下,multiprocessing.Value附加了一个我们可以使用的multiprocessing.RLock

import time
import random
import logging
from multiprocessing import Pool, Value, get_logger, log_to_stderr

def request_get(link, proxies, timeout):
"""Dummy request.get()"""
res = random.choices(["Result", "Timeout"], [0.5, 0.5])
if res[0] == "Result":
time.sleep(random.uniform(0, timeout))
return f"{res[0]} from {link}"
else:
time.sleep(timeout)
raise TimeoutError

def parse_product_info(link):
global proxy_list, proxy_index    
while True:
with proxy_index.get_lock():
idx = proxy_index.value
try:
proxy = {'https': proxy_list[idx]}
except IndexError:
# get_logger().info(f"No proxies left.")
return    
try:
# get_logger().info(f"attempt using: {proxy}")
res = request_get(link, proxies=proxy, timeout=5)
except TimeoutError:
# get_logger().info(f"timeout with: {proxy}")
with proxy_index.get_lock():
# check with lock held if index is still the same
if idx == proxy_index.value:
proxy_index.value += 1
# get_logger().info(f"incremented index: {proxy_index.value}")
else:
# get_logger().info(f"processing: {res}")
return    

def _init_globals(proxy_list, proxy_index):
globals().update(
{'proxy_list': proxy_list, 'proxy_index': proxy_index}
)

主要:

if __name__ == '__main__':
log_to_stderr(logging.INFO)
links = [
'https://www.amazon.com/dp/B00OI0RGGO',
'https://www.amazon.com/dp/B00TPKOPWA',
'https://www.amazon.com/dp/B00TH42HWE',
'https://www.amazon.com/dp/B00TPKNREM',
]
proxies = [
'103.110.37.244:36022', '180.254.218.229:8080', '110.74.197.207:50632',
'1.20.101.95:49001', '200.10.193.90:8080', '173.164.26.117:3128',
'103.228.118.66:43002', '178.128.231.201:3128', '1.2.169.54:55312',
'181.52.85.249:31487', '97.64.135.4:8080', '190.96.214.123:53251',
'52.144.107.142:31923', '45.5.224.145:52035', '89.218.22.178:8080',
'192.241.143.186:80', '113.53.29.218:38310', '36.78.131.182:39243'
]
proxies = [f"http://{proxy}" for proxy in proxies]
proxy_index = Value('i', 0)
with Pool(
processes=3,
initializer=_init_globals,
initargs=(proxies, proxy_index)
) as pool:
pool.map(parse_product_info, links)

示例输出:

[INFO/MainProcess] allocating a new mmap of length 4096
[INFO/ForkPoolWorker-1] child process calling self.run()
...
[INFO/ForkPoolWorker-1] attempt using: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-2] attempt using: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-3] attempt using: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-2] processing: Result from https://www.amazon.com/dp/B00TPKOPWA
[INFO/ForkPoolWorker-2] attempt using: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-3] timeout with: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-3] incremented index: 1
[INFO/ForkPoolWorker-3] attempt using: {'https': 'http://180.254.218.229:8080'}
[INFO/ForkPoolWorker-1] timeout with: {'https': 'http://103.110.37.244:36022'}
[INFO/ForkPoolWorker-1] attempt using: {'https': 'http://180.254.218.229:8080'}
[INFO/ForkPoolWorker-3] processing: Result from https://www.amazon.com/dp/B00TH42HWE
[INFO/ForkPoolWorker-2] processing: Result from https://www.amazon.com/dp/B00TPKNREM
[INFO/ForkPoolWorker-1] processing: Result from https://www.amazon.com/dp/B00OI0RGGO
[INFO/ForkPoolWorker-3] process shutting down
[INFO/ForkPoolWorker-2] process shutting down
...
Process finished with exit code 0

多线程

下面的建议在threading.Lock的帮助下同步代理处理(也可以包装为multiprocessing.dummy.Lock(,这是可能的,因为multiprocessing.dummy只使用线程。

请注意,相比之下,multiprocessing.Lock(不是来自.dummy(是一个繁重(相对较慢(的IPC锁,它将对整体性能产生影响,具体取决于您的同步频率。

编辑:

多线程解决方案已从早期草案中重构,以从上面的多处理解决方案中选取设计。parse_product_info()现在对于多线程/多处理几乎相同。

import time
import random
import logging
from itertools import repeat
from multiprocessing.dummy import Pool, Lock
get_logger = logging.getLogger

def request_get(link, proxies, timeout):
... # same as in multiprocessing solution above

def parse_product_info(link):
global proxies, proxy_index
while True:
with proxy_lock:
idx_proxy = proxy_index
try:
proxy = {'https': proxies[idx_proxy]}
except IndexError:
# get_logger().info(f"No proxies left.")
return
try:
# get_logger().info(f"attempt using: {proxy}")
res = request_get(link, proxies=proxy, timeout=5)
except TimeoutError:
# get_logger().info(f"timeout with: {proxy}")
with proxy_lock:
if idx_proxy == proxy_index:
proxy_index += 1
# get_logger().info(f"incremented index:{proxy_index}")
else:
# get_logger().info(f"processing: {res}")
return    

def init_logging(level=logging.INFO):
fmt = '[%(asctime)s %(threadName)s] --- %(message)s'
logging.basicConfig(format=fmt, level=level)
return logging.getLogger()

主要:

if __name__ == '__main__':
init_logging()
linklist = ... # same as in multiprocessing solution above    
proxies = ... # same as in multiprocessing solution above
proxy_index = 0
proxy_lock = Lock()
with Pool(processes=3) as pool:
pool.map(parse_product_info, links)

示例输出:

[2019-12-18 01:40:25,799 Thread-1] --- attempt using: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:25,799 Thread-2] --- attempt using: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:25,799 Thread-3] --- attempt using: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:26,164 Thread-1] --- processing: Result from https://www.amazon.com/dp/B00OI0RGGO
[2019-12-18 01:40:26,164 Thread-1] --- attempt using: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:29,568 Thread-1] --- processing: Result from https://www.amazon.com/dp/B00TPKNREM
[2019-12-18 01:40:30,800 Thread-2] --- timeout with: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:30,800 Thread-2] --- incremented index: 1
[2019-12-18 01:40:30,800 Thread-2] --- attempt using: {'https': 'http://180.254.218.229:8080'}
[2019-12-18 01:40:30,800 Thread-3] --- timeout with: {'https': 'http://103.110.37.244:36022'}
[2019-12-18 01:40:30,801 Thread-3] --- attempt using: {'https': 'http://180.254.218.229:8080'}
[2019-12-18 01:40:32,941 Thread-3] --- processing: Result from https://www.amazon.com/dp/B00TH42HWE
[2019-12-18 01:40:34,677 Thread-2] --- processing: Result from https://www.amazon.com/dp/B00TPKOPWA
Process finished with exit code 0

回复OP的最新评论:

如果您愿意,您可以在使用完所有代理后交换IndexError异常处理程序块中的代理列表。在代码中,return交换

with proxy_lock:
proxies = new_proxies
proxy_index = 0
continue

您遇到的问题是竞争条件 — 多个进程(由于您使用multiprocessing.dummy的线程(看到proxy未初始化并尝试获取新proxy

您可以使用 Python 的multiprocessing.Lock实现一个get_proxy函数:

from multiprocessing import Lock
lock = Lock()
def get_proxy():
global proxy
try:
# If proxy already set, then no need to wait for lock
if proxy:
return proxy
except NameError:
pass
with lock:
try:
# If proxy was set when waiting for lock, then don't process another
if proxy:
return proxy
except NameError:
pass
proxy = process_proxy()
return proxy
return None

连同clear_proxy函数:

def clear_proxy(non_working_proxy):
global proxy
if non_working_proxy is None:
return
with lock:
if proxy == non_working_proxy:
proxy = None

然后在parse_product_info函数中调用它们而不是process_proxy

def parse_product_info(link):
# global proxy       # Remove this
proxy = get_proxy()  # Add this
try:
...
except Exception:
# proxy = process_proxy()  # Remove this
clear_proxy(proxy)         # Add this to clear a non-working proxy
proxy = get_proxy()        # Add this to queue up to get new proxy
if proxy!=None:
return parse_product_info(link)
else:
pass

关于multiprocessing.dummy的说明

正如宫城先生在评论这个问题时提到的:

请注意,multiprocessing.dummy不使用多个进程,只使用线程。真正的多处理行为不同 - 具体来说,global数据不共享。

在这种情况下,我相信您可以从with lock块中的文件读取/写入proxy

相关内容

最新更新