如何在调用每个 API 发布请求时迭代示例用户以及端点和有效负载



原问题链接:有两个函数:

  • ReadInp() -> Dict[str, str]这将读取csv并返回一个以url作为键和负载作为值的字典。
  • ReadTUsr() -> List[List[str, str]]这将读取一个单独的csv并返回'sample users'作为列表的列表。每个内部列表都有一个id类型和一个id值。例如:

[['uId', 'T1'], ['srId', 'T2'], ['siId', 'T3'], ['siId', 'T4'], ['srId', 'T5']...]

我需要进行url调用来连接这两个值。被调用的url将来自第一个返回值,但它必须从第二个列表中拼接示例用户。这些列表的大小可能不相同,如果第二个列表在第一个列表之前耗尽,它应该在列表上再次迭代,直到第一个列表耗尽。

例如,结果调用将是:

http://localhost:8080/cabApp/callcab?uId=T1然后POST主体:{'location': '', 'ppl': '2', 'text': 'MyCab'}

或:

http://localhost:8080/cabApp/call3cab?srId=T2then POST body:{'display': 'RMP', 'time': '', 'location': 'LA'}

这是我已经尝试过的:试验1

def makeAcall():
r = ReadInp() # gives dict with endpoint as key and payload as Value 
# This dict looks like this >> {'cabApp/callcab': {'location': '', 'ppl': '2', 'text': 'MyCab'}, 'cabApp/call3cab': {'display': 'RMP', 'time': '', 'location': 'LA'}, ...}  
t = ReadTUsr() # give nested user list 
# This list looks like this >> [['uId', 'T1'], ['srId', 'T2'], ['siId', 'T3'], ['siId', 'T4'], ['srId', 'T5']...]
print(t)
print(type(r))
#for k1,v1 in r.items():
#    for i,j in t:

for (k1,v1), (i,j) in zip(r.items(), t):
url = f"{AURL}(k1)}"
rj = v1
#uid = {"uId": "T1"}
headers = {'Accept': "application/json"}
res = requests.post(url, json=rj, headers=headers, params=(i,j))
print("Req Resp Code:",res.status_code)
#api_req_res_text.append(res.text)
api_req_res_code.append(res.status_code)

return api_req_res_code

试验2

def equalize_lists(users: list, desired_length) -> list:
if len(users) > desired_length:
return users[:desired_length]
else:  # We need to extend the user list
#repeat, extra = (desired_length // len(users), desired_length % len(users))
repeat, extra = divmod(desired_length, len(users))
users *= repeat + (extra > 0)
#for _ in range(repeat):
#users.extend(users)
#users.extend(users[:extra])
return users
def makeAcall():
r = ReadInp() # gives dict with endpoint as key and payload as Value 
# This dict looks like this >> {'cabApp/callcab': {'location': '', 'ppl': '2', 'text': 'MyCab'}, 'cabApp/call3cab': {'display': 'RMP', 'time': '', 'location': 'LA'}, ...}  
t = ReadTUsr() # give nested user list 
# This list looks like this >> [['uId', 'T1'], ['srId', 'T2'], ['siId', 'T3'], ['siId', 'T4'], ['srId', 'T5']...]
print(t)
print(type(r))

users = equalize_lists(t, len(r))
#users *= repeat + (extra > 0)
for ((k1, v1), (i, j)) in (zip(r.items(), users)):

url = f"{AURL}{k1}"
rj = v1
headers = {'Accept': "application/json"}
res = requests.post(url, json=rj, headers=headers, params=(i,j))

#uid = cycle(t)
print("Req Resp Code:",res.status_code)
#api_req_res_text.append(res.text)
api_req_res_code.append(res.status_code)

return api_req_res_code
  • 注一这是我的字典看起来像{'cabApp/callcab1': {'location': '', 'ppl': '2', 'text': 'MyCab'}, 'cab2App/call2cab2': {'ppl': '', 'text': 'Cab2'}, 'cabApp/call3cab': {'display': 'RMP', 'time': '', 'location': 'LA'}...}这里cabApp/callcab1cab2App/call2cab2&cabApp/call3cab是键,它们是端点[在代码中迭代为k1],其余的都是它们各自的有效载荷[迭代为v1]。http://localhost:8080部分存储在url中,并且是静态的。我不能按照指令更新构建API的代码[res我正在发出post请求]。我正在寻找的是迭代列表的方式,并将其用作参数之后的值。
  • 注2:我们可以改变它读取样本用户csv的方式。现在我正在使用这段代码来读取示例用户csv,它给出了嵌套列表。
def ReadTUsr():
with open(T_U_csv, 'r') as file:
csv_file = csv.reader(file)
data = [it for it in csv_file]
#print(data)
return data 
  • Note3:早期的ReadTUsr()是字典,并按预期工作(能够迭代并为每个请求调用新的示例用户),但最新的代码在字典中有重复的键(多次siId&srId),所以它只从重复键中获取一条记录,这是默认的python行为。
  • ,如何迭代示例用户(i,j)以及端点&负载(k1,v1)调用每个post请求res变量。
def equalize_lists(users: list, desired_length) -> list:
if len(users) > desired_length:
return users[:desired_length]
else:  # We need to extend the user list
#repeat, extra = (desired_length // len(users), desired_length % len(users))
repeat, extra = divmod(desired_length, len(users))
users *= repeat + (extra > 0)
#for _ in range(repeat):
#users.extend(users)
#users.extend(users[:extra])
return users
def makeAcall():
r = ReadInp() # gives dict with endpoint as key and payload as Value 
# This dict looks like this >> {'cabApp/callcab': {'location': '', 'ppl': '2', 'text': 'MyCab'}, 'cabApp/call3cab': {'display': 'RMP', 'time': '', 'location': 'LA'}, ...}  
t = ReadTUsr() # give nested user list 
# This list looks like this >> [['uId', 'T1'], ['srId', 'T2'], ['siId', 'T3'], ['siId', 'T4'], ['srId', 'T5']...]
print(t)
print(type(r))

users = equalize_lists(t, len(r))
# or use this >> users = itertools.cycle(ReadInp())
#users *= repeat + (extra > 0)
for ((k1, v1), (i, j)) in (zip(r.items(), users)):

url = f"{AURL}{k1}"
rj = v1
headers = {'Accept': "application/json"}
res = requests.post(url, json=rj, headers=headers, params={i,j})

#uid = cycle(t)
print("Req Resp Code:",res.status_code)
#api_req_res_text.append(res.text)
api_req_res_code.append(res.status_code)

return api_req_res_code

相关内容

最新更新