Python MongoDB查询请求将数据分段数据分为块



我正在编写一个python脚本,在下面执行这些步骤。

查询mongoDB数据库解析和汇总结果通过REST API

将数据上传到ServiceNow表

脚本工作,但是,数据集太大了,在60秒后将其剩余的交易时间耗尽(该连接是由目标服务器关闭的)。

我将需要将数据分割成块,并为每个数据块发送单独的REST交易,以确保通过邮政发送完整的数据集而不会达到超时限制。

如何通过修改下面的脚本来实现该目标?

#!/usr/bin/env python
from config import *
import os, sys
mypath = os.path.dirname(os.path.realpath(__file__))
sys.path.append(os.path.join(mypath, "api-python-client"))
from apiclient.mongo import *
from pymongo import MongoClient
import json
import requests
from bson.json_util import dumps
client = MongoClient(mongo_uri)
#Create ServiceNow URL
svcnow_url = create_svcnow_url('u_imp_cmps')
#BITSDB Nmap Collection
db = client[mongo_db]
#Aggregate - RDBMS equivalent to Alias select x as y
#Rename fields to match ServiceNow field names
computers = db['computer'].aggregate([
        {"$unwind": "$hostnames"},
        {"$project" : {
                "_id":0,
                "u_hostname": "$hostnames.name",
                "u_ipv4": "$addresses.ipv4",
                "u_status": "$status.state",
                "u_updated_timestamp": "$last_seen"
        }}
])
j = dumps({"records":computers})
#print(j)

#Set proper headers
headers = {"Content-Type":"application/json","Accept":"application/json"}
#Build HTTP Request
response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=j)
#Check for HTTP codes other than 200
if response.status_code != 200:
        print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
        exit()
#Decode the JSON response into a dictionary and use the data
print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())

更新:我有一个计划,但我不确定如何准确地实现。

  • 将光标设置为固定批次大小为1000条记录
  • 当批处理已满时,创建JSON输出并通过请求发送数据
  • 在循环中:继续抓取新批次并将每个批次发送到目的地,直到达到整个数据集

https://docs.mongodb.com/v3.0/reference/method/cursor.batchsize/

基本上,我认为我可以每次都会通过新的API调用来创建批处理和循环通过批处理解决此问题。请让我知道是否有人有任何想法,如果这是一个好的计划以及如何实施解决方案。谢谢。

j = dumps({"records":computers})将返回列表,因此您可以通过调用j[x]或通过for for循环轻松指向单个数据条目。这些条目中的每一个都应接受服务。

# Set proper headers (these are always the same, so this
# can be assigned outside of the for loop)
headers = {"Content-Type":"application/json","Accept":"application/json"}
for data_point in j:
    #Build HTTP Request (Note we are using data_point instead of j)
    response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=data_point)
    #Check for HTTP codes other than 200
    if response.status_code != 200:
        print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
    else:
        # This is a response of success for a single record
        print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())
exit()

如果您在MongoDB中有100个新条目,则将对ServiceNow进行100个邮政通话。您的ServiceNow实例应该能够处理负载,您可以很容易地识别未能加载的记录。

但是,如果您出于任何原因需要凝结电话的数量,我建议将您的列表分为" sublists",例如此答案中的单线:

# Set proper headers (these are always the same, so this
# can be assigned outside of the for loop)
headers = {"Content-Type":"application/json","Accept":"application/json"}
# Each POST will send up to 10 records of data
split_size = 10
# Note the two places where our split_size variable is used
for data_point in [j[x:x+split_size] for x in xrange(0, len(j), split_size)]:
    #Build HTTP Request (Note we are using data_point instead of j)
    response = requests.post(url=svcnow_url, auth=(svcnow_user, svcnow_pwd), headers=headers ,data=data_point)
    #Check for HTTP codes other than 200
    if response.status_code != 200:
        print('Status:', response.status_code, 'Headers:', response.headers, 'Response Text', response.text, 'Error Response:',response.json())
    else:
        # This is a response of success for a single record
        print('Status:',response.status_code,'Headers:',response.headers,'Response:',response.json())
exit()

最新更新