我需要一种在python中转换json响应的最佳方法



我正试图从api响应中获取一个数据帧。对于优化,我运行并行线程,但时间确实很长。一个代码示例:

def parall_func(tuple):
output = pd.DataFrame()
list_caracts = list(map(str,tuple[2]))
item = [(tuple[1])]
q = len(list_caracts)

headers = {
'Content-Type':'application/json'
}
raw_data = json.dumps(
{"item": item,"list_caracts": list_caracts, "sizePage":q, "numberPage":1}
)
try:
url = "https://thisisaurl.com/rep/store"
response = requests.get(url,headers=headers,data=raw_data)
resp_to_json = json.loads(response.text)
for i in resp_to_json['tag']:
output = output.append([i])

except:
print("Error: ", sys.exc_info()[0])
raise
return output
pool = Threads(cpu_count())
df_parall=list(pool.imap(parall_func, df_queries.itertuples(name=None)))
pool.close()
Final=pd.concat(df_parall, ignore_index=True)

你能帮我纠正或建议另一种不同于熊猫的逻辑或结构吗

最终的响应有大约300万条记录

在我得到结构后,我需要做一些计算,然后用pyodbc连接到数据库以保存数据

我会尝试的两件事是:

  1. 创建一个requests.Session实例并使用它来发出GET请求。根据相关文件:

Session对象允许您在请求中保留某些参数。它还在Session实例发出的所有请求中持久保存cookie,并将使用urllib3的连接池。因此,如果您向同一主机发出多个请求,底层TCP连接将被重用,这可能会显著提高性能(请参阅HTTP持久连接(。

  1. 由于您使用的是多线程,因此将自己的线程数限制为与您拥有的内核数相等将导致性能低下。尝试创建500个线程。唯一的问题是网站是否不会抱怨每秒有太多的请求

顺便说一句,源代码有一个缩进错误。我提供了我认为应该提供的缺失导入语句,并且我已将参数tuple重命名为tpl,因为tuple是内置类型,您不应该在没有充分理由的情况下重新定义内置类型。

from multiprocessing.pool import ThreadPool as Threads
from requests import Session
from functools import partial
import pandas as pd
import sys

def parall_func(session, tpl):
output = pd.DataFrame()
list_caracts = list(map(str,tpl[2]))
item = [(tpl[1])]
q = len(list_caracts)

raw_data = json.dumps(
{"item": item,"list_caracts": list_caracts, "sizePage":q, "numberPage":1}
)
try:
url = "https://thisisaurl.com/rep/store"
response = session.get(url, data=raw_data)
resp_to_json = json.loads(response.text)
for i in resp_to_json['tag']:
output = output.append([i])
except:
print("Error: ", sys.exc_info()[0])
raise
return output
with Session() as session:
headers = {
'Content-Type':'application/json'
}
session.headers = headers
pool = Threads(500)
df_parall=list(pool.imap(partial(parall_func, session), df_queries.itertuples(name=None)))
pool.close()
Final=pd.concat(df_parall, ignore_index=True)

更新

您可以尝试的另一件事是通过用单个concat:执行多个append操作来代替创建变量output

def parall_func(session, tpl):
list_caracts = list(map(str,tpl[2]))
item = [(tpl[1])]
q = len(list_caracts)

raw_data = json.dumps(
{"item": item,"list_caracts": list_caracts, "sizePage":q, "numberPage":1}
)
try:
url = "https://thisisaurl.com/rep/store"
response = session.get(url, data=raw_data)
resp_to_json = json.loads(response.text)
dataframes = [pd.DataFrame([i]) for i in resp_to_json['tag']]
output = pd.concat(dataframes)
except:
print("Error: ", sys.exc_info()[0])
raise
return output

如果以上操作不能提高性能,那么最后一件事就是使用多处理来创建数据帧:

from multiprocessing.pool import ThreadPool as Threads, Pool as MultiProcessingPool
from requests import Session
from functools import partial
import pandas as pd
import sys

def create_data_frames(response):
resp_to_json = json.loads(response.text)
dataframes = [pd.DataFrame([i]) for i in resp_to_json['tag']]
# Perhaps you might want to specify ignore_index=True on the following:
output = pd.concat(dataframes)
return output

def parall_func(session, multiprocessing_pool, tpl):
list_caracts = list(map(str,tpl[2]))
item = [(tpl[1])]
q = len(list_caracts)

raw_data = json.dumps(
{"item": item,"list_caracts": list_caracts, "sizePage":q, "numberPage":1}
)
try:
url = "https://thisisaurl.com/rep/store"
response = session.get(url, data=raw_data)
output = multiprocessing_pool.apply(create_data_frames, args=(response,))
except:
print("Error: ", sys.exc_info()[0])
raise
return output
with Session() as session:
headers = {
'Content-Type':'application/json'
}
session.headers = headers
multiprocessing_pool = MultiProcessingPool()
pool = Threads(500)
df_parall=list(pool.imap(partial(parall_func, session, multiprocessing_pool), df_queries.itertuples(name=None)))
multiprocessing_pool.close()   
multiprocessing_pool.join()   
pool.close()
pool.join()
Final=pd.concat(df_parall, ignore_index=True)

相关内容

  • 没有找到相关文章

最新更新