如何分配可用于线程的值



我目前正在一个scraper上工作,我正试图弄清楚如何分配可使用的代理,这意味着如果我使用5个线程,如果线程-1使用代理a,则没有其他线程能够访问代理a,并且应该试着随机化所有可用的代理池。

import random
import time
from threading import Thread
import requests
list_op_proxy = [
"http://test.io:12345",
"http://test.io:123456",
"http://test.io:1234567",
"http://test.io:12345678"
]
session = requests.Session()

def handler(name):
while True:
try:
session.proxies = {
'https': f'http://{random.choice(list_op_proxy)}'
}
with session.get("https://stackoverflow.com"):
print(f"{name} - Yay request made!")
time.sleep(random.randint(5, 10))
except requests.exceptions as err:
print(f"Error! Lets try again! {err}")
continue
except Exceptions as err:
print(f"Error! Lets debug! {err}")
raise Exception

for i in range(5):
Thread(target=handler, args=(f'Thread {i}',)).start()

我想知道我如何才能创建一种方式,让我可以使用可用的代理,而不在任何线程中使用;块";代理不能用于其他线程并在完成后发布?

实现这一点的一种方法是只使用global共享列表,该列表包含当前活动的代理,或者remove列表中的代理,并在请求完成后读取它们。您不必担心列表上的并发访问,因为CPython受到GIL的影响。

proxy = random.choice(list_op_proxy)
list_op_proxy.remove(proxy)
session.proxies = {
'https': f'http://{proxy}'
}
# ... do request
list_op_proxy.append(proxy)

您也可以使用队列来完成此操作,只需弹出和添加即可使其更加高效。

使用代理队列

另一种选择是在每次查询之前将代理放入queueget()中作为代理,将其从可用代理中删除,并在请求完成后将其放回put()。这是上述列表方法的一个更有效的版本。

首先,我们需要初始化代理队列。


proxy_q = queue.Queue()
for proxy in proxies:
proxy_q.put(proxy)

handler中,我们在执行请求之前从队列中获取一个代理。我们执行请求并将代理放回队列
我们使用block=True,这样,如果当前没有可用的代理,queue就会阻塞线程。否则,一旦所有代理都在使用中,线程就会以queue.Empty异常终止,并且应该获取一个新的代理。

def handler(name):
global proxy_q
while True:
proxy = proxy_q.get(block=True) # we want blocking behaviour
# ... do request
proxy_q.put(proxy)
# ... response handling can be done after proxy put to not
# block it longer than required
# do not forget to define a break condition

使用队列和多处理

首先,您将初始化manager,并将所有数据放入队列,然后初始化另一个用于收集结果的结构(这里我们初始化一个共享列表(。

manager = multiprocessing.Manager()
q = manager.Queue()
for e in entities:
q.put(e)
print(q.qsize())
results = manager.list()

您初始化刮擦过程:

for proxy in proxies:
processes.append(multiprocessing.Process(
target=scrape_function,
args=(q, results, proxy)
daemon=True))

然后启动每个

for w in processes:
w.start()

最后,您join每个流程,以确保在子流程完成之前,主流程不会终止

for w in processes:
w.join()

scrape_function中,您只需一次get一个项目并执行请求。默认配置中的queue对象为空时会引发queue.Empty错误,因此我们使用了一个带有中断条件的无限while循环来捕获异常。

def scrape_function(q, results, proxy)
session = requests.Session()
session.proxies = {
'https': f'http://{proxy}'
}
while True:
try:
request_uri = q.get(block=False)
with session.get("https://stackoverflow.com"):
print(f"{name} - Yay request made!")
results.append(result)
time.sleep(random.randint(5, 10))
except queue.Empty:
break

每个查询的结果都被附加到结果列表中,结果列表也在不同的进程之间共享。

最新更新