大Json文件批量发送到HubSpot API



我尝试了很多方法,测试了很多场景,我做了很多研发,但无法找到问题/解决方案

我有一个要求,HubSpot API每次只接受15k的rec,所以我们有一个大的json文件,所以我们需要分割/分割,15k rec需要发送API,一旦15k添加到API中,它会休眠10秒并捕获每个响应,这个过程将继续直到所有rec完成

我尝试用块代码和模运算符,但没有得到任何响应

不确定下面的代码是否可以工作,谁能建议更好的方法

如何发送批量到HubSpot API,如何发布

提前感谢,这对我有很大的帮助!!!!!!!!

with open(r'D:Userslakshmi.vijayaDesktopInvalidemailallhubusers_data.json', 'r') as run:
dict_run = run.readlines()
dict_ready = (''.join(dict_run))
count = 1000
subsets = (dict_ready[x:x + count] for x in range(0, len(dict_ready), count))
url = 'https://api.hubapi.com/contacts/v1/contact/batch'
headers = {'Authorization' : "Bearer pat-na1-**************************", 'Accept' : 'application/json', 'Content-Type' : 'application/json','Transfer-encoding':'chunked'}
for subset in subsets:
#print(subset)
urllib3.disable_warnings()
r = requests.post(url, data=subset, headers=headers,verify=False, 
timeout=(15,20), stream=True)     
print(r.status_code)
print(r.content)

错误:;;400b'rn400错误请求rnrn

400错误请求

r n
cloudflare r n r n r n">

这是另一个方法:

with open(r'D:Userslakshmi.vijayaDesktopInvalidemailallhubusers_data.json', 'r') as run:
dict_run = run.readlines()
dict_ready = (''.join(dict_run))
url = 'https://api.hubapi.com/contacts/v1/contact/batch'
headers = {'Authorization' : "Bearer pat-na1***********-", 'Accept' : 'application/json', 'Content-Type' : 'application/json','Transfer-encoding':'chunked'}
urllib3.disable_warnings()
r = requests.post(url, data=dict_ready, headers=headers,verify=False, 
timeout=(15,20), stream=True) 
r.iter_content(chunk_size=1000000)    
print(r.status_code)
print(r.content)

错误::::抛出SSLError(e, request=request)requests.exceptions.SSLError: httpconnectionpool (host='api.hubapi.com', port=443): url:/contacts/v1/contact/batch超过了最大重试次数(由SSLError(SSLEOFError(8, 'EOF发生在违反协议(_ssl.c:2396)')引起)

json数据在大json文件中的样子

{
"email": "aaazaj21@yahoo.com",
"properties": [
{
"property": "XlinkUserID",
"value": 422211111
},
{
"property": "register_time",
"value": "2021-09-02"
},
{
"property": "linked_alexa",
"value": 1
},
{
"property": "linked_googlehome",
"value": 0
},
{
"property": "fan_speed_switch_0x51_",
"value": 2
}
]
},
{
"email": "zzz7@gmail.com",
"properties": [
{
"property": "XlinkUserID",
"value": 13333666
},
{
"property": "register_time",
"value": "2021-04-24"
},
{
"property": "linked_alexa",
"value": 1
},
{
"property": "linked_googlehome",
"value": 0
},
{
"property": "full_colora19_st_0x06_",
"value": 2
}
]
}

我尝试添加对象列表

[
{
"email": "aaazaj21@yahoo.com",
"properties": [
{
"property": "XlinkUserID",
"value": 422211111
},
{
"property": "register_time",
"value": "2021-09-02"
},
{
"property": "linked_alexa",
"value": 1
},
{
"property": "linked_googlehome",
"value": 0
},
{
"property": "fan_speed_switch_0x51_",
"value": 2
}
]
},
{
"email": "zzz7@gmail.com",
"properties": [
{
"property": "XlinkUserID",
"value": 13333666
},
{
"property": "register_time",
"value": "2021-04-24"
},
{
"property": "linked_alexa",
"value": 1
},
{
"property": "linked_googlehome",
"value": 0
},
{
"property": "full_colora19_st_0x06_",
"value": 2
}
]
}
]

您还没有说明您的JSON文件是对象数组的表示还是仅仅是一个对象。数组通过json转换为Python列表。load和objects被转换为Python字典。

这里有一些代码假设它是一个对象数组如果它不是对象数组参见https://stackoverflow.com/a/22878842/839338但是同样的原理也可以使用

假设你想要15k字节而不是记录,如果它是记录的数量,你可以简化代码并将15000作为第二个参数传递给chunk_list()。

import json
import math
import pprint

# See https://stackoverflow.com/a/312464/839338
def chunk_list(list_to_chunk, number_of_list_items):
"""Yield successive chunk_size-sized chunks from list."""
for i in range(0, len(list_to_chunk), number_of_list_items):
yield list_to_chunk[i:i + number_of_list_items]

with open('./allhubusers_data.json', 'r') as run:
json_data = json.load(run)
desired_size = 15000
json_size = len(json.dumps(json_data))
print(f'{json_size=}')
print(f'Divide into {math.ceil(json_size/desired_size)} sub-sets')
print(f'Number of list items per subset = {len(json_data)//math.ceil(json_size/desired_size)}')
if isinstance(json_data, list):
print("Found a list")
sub_sets = chunk_list(json_data, len(json_data)//math.ceil(json_size/desired_size))
else:
exit("Data not list")
for sub_set in sub_sets:
pprint.pprint(sub_set)
print(f'Length of sub-set {len(json.dumps(sub_set))}')
# Do stuff with the sub sets...
text_subset = json.dumps(sub_set)  # ...

如果sub_sets的文本长度不同,您可能需要向下调整desired_size的值。

更新响应注释如果你只需要每个请求15000条记录,这个代码应该为你工作

import json
import pprint
import requests

# See https://stackoverflow.com/a/312464/839338
def chunk_list(list_to_chunk, number_of_list_items):
"""Yield successive chunk_size-sized chunks from list."""
for i in range(0, len(list_to_chunk), number_of_list_items):
yield list_to_chunk[i:i + number_of_list_items]

url = 'https://api.hubapi.com/contacts/v1/contact/batch'
headers = {
'Authorization': "Bearer pat-na1-**************************",
'Accept': 'application/json',
'Content-Type': 'application/json',
'Transfer-encoding': 'chunked'
}
with open(r'D:Userslakshmi.vijayaDesktopInvalidemailallhubusers_data.json', 'r') as run:
json_data = json.load(run)
desired_size = 15000
if isinstance(json_data, list):
print("Found a list")
sub_sets = chunk_list(json_data, desired_size)
else:
exit("Data not list")
for sub_set in sub_sets:
# pprint.pprint(sub_set)
print(f'Length of sub-set {len(sub_set)}')
r = requests.post(
url,
data=json.dumps(sub_set),
headers=headers,
verify=False,
timeout=(15, 20),
stream=True
)
print(r.status_code)
print(r.content)

相关内容

  • 没有找到相关文章

最新更新