如何在 Python 中创建和使用 http 并行管理器?



我是Python的新手,我想创建一个类HttpParallelHandler,以并行(多线程(模式处理它收到的所有HTTP请求。客户端(将使用 HttpParallelHandler 的其他代码段中的类(必须通过传递一些参数(如方法、数据和 onsuccess 回调(来注册它们想要发出的每个 HTTP 请求。

下面是该类的示例:

import threading
import time
import logging
import random
import Queue
import sys
import requests
class ParallelHttpHandler(object):
POOL_SIZE = 10
def __init__(self):
self.requests_queue = Queue.Queue()
self.callback_lock = threading.RLock()
self.pool_threads = [RequestConsumerThread(self.requests_queue, self.callback_lock) for count in xrange(ParallelHttpHandler.POOL_SIZE)]
for thread in self.pool_threads:
thread.start()
def http_get(self, url, data, headers=None, onsuccess=None, onerror=None):
self.http_request("get", url, data, headers, onsuccess, onerror)
def http_post(self, url, data, headers=None, onsuccess=None, onerror=None):
self.http_request("post", url, data, headers, onsuccess, onerror)
def http_request(self, method, url, data, headers, onsuccess, onerror):
if not self.requests_queue.full():
request = {
"method": method,
"url": url,
"data": data,
"headers": headers,
"onsuccess": onsuccess,
"onerror": onerror
}
self.requests_queue.put(request)
def wait_all(self):
self.requests_queue.join()
class RequestConsumerThread(threading.Thread):
DEFAULT_HTTP_HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/66.0.3359.181 Safari/537.36',
'Accept': 'text/html, */*; q=0.01',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'en-US,en;q=0.8',
'Connection': 'keep-alive'
}
def __init__(self, requests_queue, callback_lock):
super(RequestConsumerThread, self).__init__()
self.requests_queue = requests_queue
self.callback_lock = callback_lock
self.daemon = True
def http_get(self, url, data=None, headers=None):
response = requests.get(url, data=data, headers=headers or RequestConsumerThread.DEFAULT_HTTP_HEADERS)
return response.text if response.status_code == 200 else None
def http_post(self, url, data, headers=None):
response = requests.post(url, data=data, headers=headers or RequestConsumerThread.DEFAULT_HTTP_HEADERS)
return response.text if response.status_code == 200 else None
def run(self):
while True:
if not self.requests_queue.empty():
request = self.requests_queue.get()
try:
# Effettua la richiesta http
request_arguments = (request["url"], request["data"], request["headers"])
response = self.http_get(*request_arguments) if request["method"] == "get" else self.http_post(*request_arguments)
# In un contesto thread safe esegue la callback di success
self.callback_lock.acquire()
onsuccess = request["onsuccess"]
if onsuccess is not None:
onsuccess(response)
self.callback_lock.release()
except Exception as e:
self.callback_lock.acquire()
onerror = request["onerror"]
if onerror is not None:
print(request["onerror"](e))
self.callback_lock.release()
finally:
self.requests_queue.task_done()

现在的问题是:当我在 for 循环中使用这个类时,为了并行处理 50 个 http 请求,我分配给每个请求的回调有问题。

示例代码(实际代码比较复杂(:

if __name__ == '__main__':
def onsuccess(html_code, index): 
print "Success: html received. index: " + str(index)
def onerror(e): 
print "Error: " + str(e)
http_handler = ParallelHttpHandler()
urls = ["https://google.com?count=%d" % n for n in range(50)]
for index, url in enumerate(urls):
http_handler.http_get(url, None, None, lambda html_code: onsuccess(html_code, index), onerror)
http_handler.wait_all()

此代码输出以下内容:

Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49
Success: html received. index: 49

为什么总是49?我知道在 Python 中传递策略是call-by-reference但即使我传递 copy.copy(index(,代码也不起作用。

这是因为 Python 中的后期绑定和作用域规则(关于循环(。与复制和整个HTTP内容无关。与按引用调用有些关系。请参阅更简单的示例:

>>> lst = []
>>> for x in range(5):
...     lst.append(lambda : x)
... 
>>> for l in lst:
...     l()
... 
4
4
4
4
4

解决方案是用函数包装lambda。它生成一个新范围:

>>> lst = []
>>> def factory(x):
...     return lambda : x
... 
>>> for x in range(5):
...     lst.append(factory(x))
... 
>>> for l in lst:
...     l()
... 
0
1
2
3
4

最新更新