使用多线程从请求中提取Gzip文件



我试图提取一堆(62.000)gzip文件。在这些文件中有一个格式化为JSON的文本文档。现在我正在下载所有这些文件与请求模块多线程:

def fetch_file(url,filename):
try:
html = requests.get(url,stream=True,allow_redirects=True)
open('Streams Total Daily/'+filename+'.json.gz','wb').write(html.content)
return html.status_code
except requests.exceptions.RequestException as e:
return e
def get_streams():
threads = []
with ThreadPoolExecutor(max_workers=10) as executor:
for uri in country_uris:
split = uri.split('/')
filename = 'streams_'+split[1] + '_' +split[4]+'_'+split[5]+'_'+split[6] + '_'+ split[7]
url = f"{link}{uri}?access_token={access_token}"
threads.append(executor.submit(fetch_file,url,filename))

for task in as_completed(threads):
print(task.result())
get_streams()

我有一些代码可以循环遍历文件所在的文件夹,但是对于62000个文件,这需要很长时间。我尝试了一些版本通过响应。通过gzip.GzipFile(),但这只是给我空文件。

def fetch_file(url,filename):
try:
html = requests.get(url,stream=True,allow_redirects=True)
gz = gzip.GzipFile(fileobj=html.content)
with open('test/'+filename + '.json','wb') as out:
out.write(gz.read())
return html.status_code
except requests.exceptions.RequestException as e:
return e
def get_streams():
threads = []
with ThreadPoolExecutor(max_workers=10) as executor:
for uri in country_uris:
split = uri.split('/')
filename = 'streams_'+split[1] + '_' +split[4]+'_'+split[5]+'_'+split[6] + '_'+ split[7]
url = f"{link}{uri}?access_token={access_token}"
threads.append(executor.submit(fetch_file,url,filename))

for task in as_completed(threads):
print(task.result())
get_streams()

有谁知道如何处理这个问题吗?任何建议或解决方案,非常感谢!

最终对我起作用的是Zlib模块。首先使用response.content获得字节响应,然后使用zlib.decompress(response.content, 16 +zlib.MAX_WBITS)解压数据,最后将解压后的数据写入.json文件:

def get_files(i):
url = f"{url}{i}"
elements = i.split('/') 
name = elements[1] + '_' +elements[3] + '_' + elements[4] + '_' + elements[5]+ '_' + elements[6] + '_' +elements[7]
try:
response = requests.get(url=url,headers=headers,allow_redirects=True,).content
decompressed_data = zlib.decompress(response, 16 + zlib.MAX_WBITS)
with open(f"Streams Total Daily/{name}.json",'wb') as out:
out.write(decompressed_data)
except requests.exceptions.RequestException as e:
return e

def runner():
threads =[]
with ThreadPoolExecutor(max_workers=10) as executor:
for i in country_files:
threads.append(executor.submit(get_files,i))
runner()

最新更新