我目前正在一个scraper上工作,我正试图弄清楚如何分配可使用的代理,这意味着如果我使用5个线程,如果线程-1使用代理a,则没有其他线程能够访问代理a,并且应该试着随机化所有可用的代理池。
import random
import time
from threading import Thread
import requests
list_op_proxy = [
"http://test.io:12345",
"http://test.io:123456",
"http://test.io:1234567",
"http://test.io:12345678"
]
session = requests.Session()
def handler(name):
while True:
try:
session.proxies = {
'https': f'http://{random.choice(list_op_proxy)}'
}
with session.get("https://stackoverflow.com"):
print(f"{name} - Yay request made!")
time.sleep(random.randint(5, 10))
except requests.exceptions as err:
print(f"Error! Lets try again! {err}")
continue
except Exceptions as err:
print(f"Error! Lets debug! {err}")
raise Exception
for i in range(5):
Thread(target=handler, args=(f'Thread {i}',)).start()
我想知道我如何才能创建一种方式,让我可以使用可用的代理,而不在任何线程中使用;块";代理不能用于其他线程并在完成后发布?
实现这一点的一种方法是只使用global
共享列表,该列表包含当前活动的代理,或者remove
列表中的代理,并在请求完成后读取它们。您不必担心列表上的并发访问,因为CPython受到GIL的影响。
proxy = random.choice(list_op_proxy)
list_op_proxy.remove(proxy)
session.proxies = {
'https': f'http://{proxy}'
}
# ... do request
list_op_proxy.append(proxy)
您也可以使用队列来完成此操作,只需弹出和添加即可使其更加高效。
使用代理队列
另一种选择是在每次查询之前将代理放入queue
和get()
中作为代理,将其从可用代理中删除,并在请求完成后将其放回put()
。这是上述列表方法的一个更有效的版本。
首先,我们需要初始化代理队列。
proxy_q = queue.Queue()
for proxy in proxies:
proxy_q.put(proxy)
在handler
中,我们在执行请求之前从队列中获取一个代理。我们执行请求并将代理放回队列
我们使用block=True
,这样,如果当前没有可用的代理,queue
就会阻塞线程。否则,一旦所有代理都在使用中,线程就会以queue.Empty
异常终止,并且应该获取一个新的代理。
def handler(name):
global proxy_q
while True:
proxy = proxy_q.get(block=True) # we want blocking behaviour
# ... do request
proxy_q.put(proxy)
# ... response handling can be done after proxy put to not
# block it longer than required
# do not forget to define a break condition
使用队列和多处理
首先,您将初始化manager
,并将所有数据放入队列,然后初始化另一个用于收集结果的结构(这里我们初始化一个共享列表(。
manager = multiprocessing.Manager()
q = manager.Queue()
for e in entities:
q.put(e)
print(q.qsize())
results = manager.list()
您初始化刮擦过程:
for proxy in proxies:
processes.append(multiprocessing.Process(
target=scrape_function,
args=(q, results, proxy)
daemon=True))
然后启动每个
for w in processes:
w.start()
最后,您join
每个流程,以确保在子流程完成之前,主流程不会终止
for w in processes:
w.join()
在scrape_function
中,您只需一次get
一个项目并执行请求。默认配置中的queue
对象为空时会引发queue.Empty
错误,因此我们使用了一个带有中断条件的无限while循环来捕获异常。
def scrape_function(q, results, proxy)
session = requests.Session()
session.proxies = {
'https': f'http://{proxy}'
}
while True:
try:
request_uri = q.get(block=False)
with session.get("https://stackoverflow.com"):
print(f"{name} - Yay request made!")
results.append(result)
time.sleep(random.randint(5, 10))
except queue.Empty:
break
每个查询的结果都被附加到结果列表中,结果列表也在不同的进程之间共享。