我正在尝试将8m[rows]*1k[columns]python数据帧加载到Mongo中。为了提高性能,我计划使用Mongo批量操作。我每天都要更新收藏品,所以我使用了批量操作的追加销售方法。下面是我准备的代码,
def BulkLoad(self,Dataset):
counter = 0;
empty = '000'
columns = []
records = []
DataDict = {}
for col in Dataset.columns:
columns.append(col)
try:
db = self.getDatabase()
bulk = db.collection.initialize_ordered_bulk_op()
for j in range(len(Dataset)):
records.clear()
DataDict.clear()
DataDict.update(
{'CreatedBy': empty, 'ModifiedBy': empty, 'Value': Score})
for column in columns:
colValue = str(Dataset[column][j])
if (colValue == 'nan'):
colValue = colValue.replace('nan', '')
DataDict.update({column: colValue})
records.append(DataDict)
print("list is ",records)
Id = DataDict['Id']
Number = DataDict['Number']
print(DataDict)
bulk.find(
{'Id': Id, 'Number': Number}).upsert().update(
{
'$set': {'Id': Id, 'Number': Number,'Events':records}
})
counter += 1
if counter % 1000 == 0:
result = bulk.execute()
logging.info(pprint(result))
bulk = db.coll.initialize_ordered_bulk_op()
if counter % 1000 != 0:
result = bulk.execute()
logging.info(pprint(result))
except Exception as e:
logging.exception(e)
except BulkWriteError as bre:
logging.error(pprint(bre.details))
如果我将样本行10加载到Mongo集合中,那么所有文档都具有相同的第10行值。我知道它是因为python字典引用问题。
你能请谁给我建议吗?
def BulkLoad(self,Dataset):
counter = 0;
empty = '000'
columns = []
records = []
for col in Dataset.columns:
columns.append(col)
try:
db = self.getDatabase()
bulk = db.collection.initialize_ordered_bulk_op()
for j in range(len(Dataset)):
DataDict = {}
DataDict.update(
{'CreatedBy': empty, 'ModifiedBy': empty, 'Value': Score})
for column in columns:
colValue = str(Dataset[column][j])
if (colValue == 'nan'):
colValue = colValue.replace('nan', '')
DataDict.update({column: colValue})
records.append(DataDict)
print("list is ",records)
Id = DataDict['Id']
Number = DataDict['Number']
print(DataDict)
bulk.find(
{'Id': Id, 'Number': Number}).upsert().update(
{
'$set': {'Id': Id, 'Number': Number,'Events':records}
})
counter += 1
if counter % 1000 == 0:
result = bulk.execute()
logging.info(pprint(result))
bulk = db.coll.initialize_ordered_bulk_op()
if counter % 1000 != 0:
result = bulk.execute()
logging.info(pprint(result))
except Exception as e:
logging.exception(e)
except BulkWriteError as bre:
logging.error(pprint(bre.details))