如何在 python 和 mongodb 中优化这一系列的请求和迭代



我有一个stock这个形式的集合:

{
_id: ObjectId("5e132f29009502d4e85e1293"),
Product: ObjectId("5e132f29009502c4e97e8796"),
Stock: [
{
Qty: 50, 
Expiration Date: 2022-05-01T00:00:00.000+00:00
}
]
}

此集合包含每个产品的当前库存。大约有 5000 个条目。

现在我必须在给定日期评估股票。为此,我使用一个简单的公式:

stock = actual_stock + total_output - total_input

我有一个用于产品输入(到达收集(的集合,另一个用于输出操作(申请收集(的集合:

抵达领取

{
_id: ObjectId("5e26eed55c0e07995d9f2cd0"),
Order Number: 200049,
Reception: [
{Product: ObjectId(5e132f3e009502d4e85e2af4), Qty: 10, Expiration Date: 2022-05-01T00:00:00.000+00:00}
],
Date: 2020-01-21T13:30:13.529+00:00
}

申请收集

{
_id: ObjectId("5e26eed55c0e07995d9f2cd0"),
Requisition Number: 200049,
Products: [
{Product: ObjectId(5e132f3e009502d4e85e2af4), Qty: 10, Expiration Date: 2022-05-01T00:00:00.000+00:00}
],
Date: 2020-01-21T13:30:13.529+00:00
}

这些文档中显然还有其他信息,这只是显示其组成的摘录。

现在这是python代码:

# imports ...
stock_db = mongo.db.Stock
arrival_db = mongo.db.Arrival
requisition_db = mongo.db.Requisitions

def check_arrival_product(product, date):
check_arrival = arrival_db.aggregate([{'$unwind': '$Reception'},
{'$match': {
'Reception.Product': ObjectId(product),
'$and': [
{'Reception.Date':
{'$gte': date}
}]}
}])
qty = 0
for i in check_arrival:
qty += i['Reception'].get('Qty')
return qty

def check_requisition_product(product, date):
check_requisition = requisition_db.aggregate([{'$unwind': '$Products'},                                         
{'$match': {
'Products.Product': ObjectId(product),
'$and': [
{'Date':
{'$gte': date}
}]}
}])
qty = 0
for i in check_requisition:
qty += i['Products']['Qty']
return qty

def main(date):
# ....
check_stock = stock_db.find()
check_stock.batch_size(1000)
for i in check_stock:
stock = 0
for j in i['Stock']:
stock += j['Qty']
total_arrival = check_arrival_product(i['Product'], date)
total_requisition = check_requisition_product(i['Product'], date)
stock = stock + total_requisition - total_arrival
# ....         

正如您在 main 函数中看到的,我迭代了 5000 种产品,对于每种产品,我必须评估在给定日期输入和取出的股票,以便计算该日期的库存。

主要问题是操作需要长达 4 分钟,这太长了。

PS:数据库在同一台计算机上。

那么如何优化这种操作呢?

我的第一个想法是将数据非规范化。即:创建一个新的集合,例如"交易",并在 productId 和日期上添加索引,并将所有交易添加到其中,包括申请和到达。可以在每次新事务到达时在运行时执行此操作,也可以使用两个具有 $out/$merge 阶段的聚合管道作为批处理作业执行此操作。

对于批处理作业,它应该是这样的:

transaction_db.createIndex{
"productId":1,
"date":1
}
requisition_db.aggregate([
{'$unwind': '$Products'},                                         
//TODO: map productId, date, delta=-Qty
{'$out': 'transaction_db'}                                               
])
arrival_db.aggregate([
{'$unwind': '$Products'},                                         
//TODO: map productId, date, delta=+Qty 
{'$merge': {into: 'transaction_db'}}                                               
])

在此新集合上,为每个 productId 创建库存将是使用 $group 阶段的单个聚合管道。

transaction_db.aggregate([ { 
$group: { 
_id: {productId: "$productId", date: "$date"}, 
deltaPerDay: { $sum: "$delta" }
} 
} ] )

另一个想法是看看从产品到申请或到货的$lookup阶段。但是为此,您需要先解除它们以获得单独的产品交易,我不确定该怎么做。

如果你只有 5000 个产品,如果你把它们都保存在内存中并在 python 端计算增量,你可能会更快。

  • 首先对当前股票执行 findAll,并将所有这些股票保存在按 id 索引的字典中。
  • 然后使用 findAll 读取所有申请并更新内存中的库存。
  • 然后使用 findAll 读取所有到达并更新内存中的库存。

如果您有足够的内存,这是一个更容易实现的实现。

最新更新