关于使用upstart方法批量加载MongoDB的问题



我正在尝试将8m[rows]*1k[columns]python数据帧加载到Mongo中。为了提高性能,我计划使用Mongo批量操作。我每天都要更新收藏品,所以我使用了批量操作的追加销售方法。下面是我准备的代码,

def BulkLoad(self,Dataset):

    counter = 0;        
    empty = '000'
    columns = []
    records = []
    DataDict = {}
    for col in Dataset.columns:
        columns.append(col)
    try:
        db = self.getDatabase()
        bulk = db.collection.initialize_ordered_bulk_op()
        for j in range(len(Dataset)):
            records.clear()
            DataDict.clear()
            DataDict.update(
                    {'CreatedBy': empty, 'ModifiedBy': empty, 'Value': Score})
            for column in columns:
                colValue = str(Dataset[column][j])
                if (colValue == 'nan'):
                    colValue = colValue.replace('nan', '')

                DataDict.update({column: colValue})
            records.append(DataDict)
            print("list is ",records)
            Id = DataDict['Id']
            Number = DataDict['Number']
            print(DataDict)               
            bulk.find(
                    {'Id': Id, 'Number': Number}).upsert().update(
                    {
                         '$set': {'Id': Id, 'Number': Number,'Events':records}
                    })
            counter += 1
            if counter % 1000 == 0:
                result = bulk.execute()
                logging.info(pprint(result))
                bulk = db.coll.initialize_ordered_bulk_op()
        if counter % 1000 != 0:
            result = bulk.execute()
            logging.info(pprint(result))
    except Exception as e:
        logging.exception(e)
    except BulkWriteError as bre:
        logging.error(pprint(bre.details))

如果我将样本行10加载到Mongo集合中,那么所有文档都具有相同的第10行值。我知道它是因为python字典引用问题。

你能请谁给我建议吗?

def BulkLoad(self,Dataset):

counter = 0;        
empty = '000'
columns = []
records = []
for col in Dataset.columns:
    columns.append(col)
try:
    db = self.getDatabase()
    bulk = db.collection.initialize_ordered_bulk_op()
    for j in range(len(Dataset)):
        DataDict = {}            
        DataDict.update(
                {'CreatedBy': empty, 'ModifiedBy': empty, 'Value': Score})
        for column in columns:
            colValue = str(Dataset[column][j])
            if (colValue == 'nan'):
                colValue = colValue.replace('nan', '')

            DataDict.update({column: colValue})
        records.append(DataDict)
        print("list is ",records)
        Id = DataDict['Id']
        Number = DataDict['Number']
        print(DataDict)               
        bulk.find(
                {'Id': Id, 'Number': Number}).upsert().update(
                {
                     '$set': {'Id': Id, 'Number': Number,'Events':records}
                })
        counter += 1
        if counter % 1000 == 0:
            result = bulk.execute()
            logging.info(pprint(result))
            bulk = db.coll.initialize_ordered_bulk_op()
    if counter % 1000 != 0:
        result = bulk.execute()
        logging.info(pprint(result))
except Exception as e:
    logging.exception(e)
except BulkWriteError as bre:
    logging.error(pprint(bre.details))

最新更新