在Python中进行多线程http请求



我正试图编写一个XSS扫描程序来查找可用于请求参数的未过滤字符。这里的问题是代码运行速度变慢,因为必须发送大量http请求。这是代码:

from time import sleep
import requests
import urllib.parse
import re
from bs4 import BeautifulSoup
import urllib3
import argparse
urllib3.disable_warnings()
parser = argparse.ArgumentParser(description="XSS Scanner")
parser.add_argument("-u", "--url",
required=True,
help='Specify the URL.')
parser.add_argument("-t", "--threads",
default=5,
type=int,
help='Number of threads to send HTTP requests.')
parser.add_argument("-o", "--output",
default=False,
action="store",
help='Use this switch if you want to store data to a file.')
parser.add_argument("-s", "--sleep",
default=False,
action="store",
type=int,
help='Use this switch if you want to have time interval.')
options = parser.parse_args()

def send_request(url):
response = requests.get(url, allow_redirects=True, verify=False)
return response.content

def extract_forms_names(url_response):
extracted_names = []
forms = BeautifulSoup(url_response, features='lxml').find_all('form')
for form in forms:
for field in form:
try:
if field.has_attr('name'):
extracted_names.append(field['name'])
except:
pass
return extracted_names

def extract_javascript_variables(Target_response):
regex_pattern = r'(?:var|const|let)s+(w+)'
extracted_variables = re.findall(
regex_pattern, Target_response.decode('UTF-8'))
return extracted_variables
Time_interval = options.sleep
Thread_num = options.threads
# Target_url = 'https://brutelogic.com.br/gym.php'
Target_url = options.url
Target_response = send_request(Target_url)
extracted_names = extract_forms_names(Target_response)
extracted_variables = extract_javascript_variables(Target_response)
extracted_parameters = extracted_names + extracted_variables
reflected_parameters = []
for parameter in extracted_parameters:
request_url = f'{Target_url}?{parameter}=NOOB'
response = send_request(request_url).decode('UTF-8')
sleep(Time_interval)
if 'NOOB' in response:
reflected_parameters.append(parameter)
allowed_character_parameters = []
check_characters = ['"', "'", ">", "<", "{", "}", ";", ":", "&"]
try:
with open(options.output,'w') as output_file:    
for parameter in reflected_parameters:
parameter_details = {
"parameter": parameter,
"allowed_characters": []
}
for character in check_characters:
payload = f'NOOB{character}'
url_encoded_payload = urllib.parse.quote(payload)
request_url = f'{Target_url}?{parameter}={url_encoded_payload}'
response = send_request(request_url).decode('UTF-8')
sleep(Time_interval)
if payload in response:
parameter_details['allowed_characters'].append(character)
allowed_character_parameters.append(parameter_details)
print(parameter_details)
output_file.write(str(parameter_details)+'n')
except:
for parameter in reflected_parameters:
parameter_details = {
"parameter": parameter,
"allowed_characters": []
}
for character in check_characters:
payload = f'NOOB{character}'
url_encoded_payload = urllib.parse.quote(payload)
request_url = f'{Target_url}?{parameter}={url_encoded_payload}'
response = send_request(request_url).decode('UTF-8')
sleep(Time_interval)
if payload in response:
parameter_details['allowed_characters'].append(character)
allowed_character_parameters.append(parameter_details)
print(parameter_details)

我想在这些部分发送多线程http请求以加快代码:

for parameter in extracted_parameters:
request_url = f'{Target_url}?{parameter}=NOOB'
response = send_request(request_url).decode('UTF-8')
sleep(Time_interval)
if 'NOOB' in response:
reflected_parameters.append(parameter)
for parameter in reflected_parameters:
parameter_details = {
"parameter": parameter,
"allowed_characters": []
}
for character in check_characters:
payload = f'NOOB{character}'
url_encoded_payload = urllib.parse.quote(payload)
request_url = f'{Target_url}?{parameter}={url_encoded_payload}'
response = send_request(request_url).decode('UTF-8')
sleep(Time_interval)
if payload in response:
parameter_details['allowed_characters'].append(character)
allowed_character_parameters.append(parameter_details)
print(parameter_details)

运行代码的方法示例:

python xssScanner.py -u https://brutelogic.com.br/gym.php -t 10

每次执行request.get()时,都会重新打开连接。如果你必须在同一台主机上进行多次获取,你可以创建一个会话

mySession = requests.Session()

然后

def send_request(url):
response = mySession.get(url, allow_redirects=True, verify=False)
return response.content

最新更新