从快照文档中获取当前状态-mongoDB



我正试图从集合中获取特定时间的当前持有者列表。我的收藏如下:

[
{
"time": 1,
"holdings": [
{ "owner": "A", "tokens": 2 },
{ "owner": "B", "tokens": 1 }
]
},
{
"time": 2,
"holdings": [
{ "owner": "B", "tokens": 2 }
]
},
{
"time": 3,
"holdings": [
{ "owner": "A", "tokens": 3 },
{ "owner": "B", "tokens": 1 },
{ "owner": "C", "tokens": 1 }
]
},
{
"time": 4,
"holdings": [
{ "owner": "C", "tokens": 0 }
]
}
]

tokens显示所有者的当前持有量,如果持有量已更改为最后一个文档。我想更改集合,使holdings始终包括任何时间点的全部当前持有量。

time: 1,持有量为:A: 2, B: 1。在time: 2,持有量为:A: 2, B: 2。然而,这些藏品不包括A的藏品,因为它们没有改变。所以我想得到的是:

[
{
"time": 1,
"holdings": [
{ "owner": "A", "tokens": 2 },
{ "owner": "B", "tokens": 1 }
]
},
{
"time": 2,
"holdings": [
{ "owner": "A", "tokens": 2 },  // merged from prev doc.
{ "owner": "B", "tokens": 2 }
]
},
{
"time": 3,
"holdings": [
{ "owner": "A", "tokens": 3 },
{ "owner": "B", "tokens": 1 },
{ "owner": "C", "tokens": 1 }
]
},
{
"time": 4,
"holdings": [
{ "owner": "A", "tokens": 3 },  // merged from prev
{ "owner": "B", "tokens": 1 },  // merged from prev
{ "owner": "C", "tokens": 0 }
]
}
]

据我所知,$mergeObjects可以做到这一点,但我不明白如何将所有以前的文档按顺序合并到每个文档的当前文档。所以我正在寻找一种将setWindowFieldsmergeObjects结合起来的方法。

这是一个不错的挑战。

到目前为止,我得到了一个复杂的解决方案:

  1. 在我们所有的文档中获取我们所有的时间戳。这就是前4个步骤的目的。CCD_ 11用于累积该数据
  2. $group,并计算空的时间戳作为wantedTimes-接下来的5个步骤
  3. $set空时间戳,其中tokens: null将填充实际数据,$unwind将分离-接下来的3个步骤
  4. 使用$setWindowFields查找每个所有者在每个时间戳的最后一个已知令牌
  5. 为具有未知标记的文档填充此最后一个已知状态-2步
  6. $group和格式答案:
db.collection.aggregate([
{
$setWindowFields: {
sortBy: {time: 1},
output: {
allTimes: {$addToSet: "$time", window: {documents: ["unbounded", "current"]}
}
}
}
},
{
$setWindowFields: {
sortBy: {time: -1},
output: {
allTimes: {$addToSet: "$allTimes", window: {documents: ["unbounded", "current"]}
}
}
}
},
{
$set: {
allTimes: {
$reduce: {
input: "$allTimes",
initialValue: [],
in: {"$concatArrays": ["$$value", "$$this"]}
}
}
}
},
{$set: {allTimes: {$setIntersection: "$allTimes"}}},
{$unwind: "$holdings"},
{$sort: {time: 1}},
{$group: { _id: "$holdings.owner",
tokens: {$push: {tokens: "$holdings.tokens", time: "$time"}},
times: {$push: "$time"}, firstTime: {$first: "$time"},
allTimes: {$first: "$allTimes"}}
},
{
$addFields: {
wantedTimes: {
$filter: {
input: "$allTimes",
as: "item",
cond: {$gte: ["$$item", "$firstTime"]}
}
}
}
},
{
$project: {
tokens: 1,
wantedTimes: {$setDifference: ["$wantedTimes", "$times"]}
}
},
{
$set: {
data: {
$map: {
input: "$wantedTimes",
as: "item",
in: {time: "$$item", tokens: null}
}
}
}
},
{$project: {tokens: {"$concatArrays": ["$tokens", "$data"]}}},
{$unwind: "$tokens"},
{
$setWindowFields: {
partitionBy: "$_id",
sortBy: {"tokens.time": 1},
output: {
lastTokens: {
$push: "$tokens.tokens",
window: {documents: ["unbounded", "current"]}
}
}
}
},
{
$set: {
lastTokens: {
$filter: {
input: "$lastTokens",
as: "item",
cond: {$ne: ["$$item", null]}
}
}
}
},
{
$set: {
"tokens.tokens": {$ifNull: ["$tokens.tokens", {$last: "$lastTokens"}]}
}
},
{
$group: {
_id: "$tokens.time",
holdings: {$push: {owner: "$_id", tokens: "$tokens.tokens" }}
}
},
{$project: {time: "$_id", holdings: 1, _id: 0}},
{$sort: {time: 1}}
])

游乐场示例

从性能的角度来看,我建议您将其分为两个调用,第一个调用将是快速findOne,以获得集合中的最大时间值。

一旦你有了这个价值,管道就会变得更精简:

const maxItem = await db.collection.findOne({}).sort({ time: -1 });
db.collection.aggregate([
{
$unwind: "$holdings"
},
{
$group: {
_id: "$holdings.owner",
times: {
$push: {
time: "$time",
tokens: "$holdings.tokens"
}
},
minTime: {
$min: "$time"
}
}
},
{
$addFields: {
times: {
$reduce: {
input: {
$range: [
"$minTime",
maxItem.time + 1 // this is max time
]
},
initialValue: {
values: [],
lastIndex: 0
},
in: {
values: {
"$concatArrays": [
"$$value.values",
[
{
$cond: [
{
$in: [
"$$this",
"$times.time"
]
},
{
"$arrayElemAt": [
"$times",
"$$value.lastIndex"
]
},
{
"$mergeObjects": [
{
tokens: 0
},
{
"$arrayElemAt": [
"$times",
{
$subtract: [
"$$value.lastIndex",
1
]
}
]
},
{
time: "$$this"
}
]
}
]
}
]
]
},
lastIndex: {
$cond: [
{
$in: [
"$$this",
"$times.time"
]
},
{
$sum: [
"$$value.lastIndex",
1
]
},
"$$value.lastIndex"
]
}
}
}
}
}
},
{
$unwind: "$times.values"
},
{
$group: {
_id: "$times.values.time",
holdings: {
$push: {
owner: "$_id",
tokens: "$times.values.tokens"
}
}
}
},
{
$project: {
_id: 0,
time: "$_id",
holdings: 1
}
},
{
$sort: {
time: 1
}
}
])

这仍然是一个相当繁重的查询,因为它需要$unwind$group整个集合,但由于需求的原因,没有解决方法。如果集合对于这种方法来说太大,我建议逐个所有者迭代,或者逐个时间迭代,并相应地进行单独的更新。

Mongo游乐场

如果您根本不关心性能,并且希望在单个查询中使用它,那么您仍然可以使用相同的管道,您必须首先提取集合中的最长时间,这将需要添加一个初始$group阶段,如下所示:

db.collection.aggregate([
{
$group: {
_id: null,
maxTime: {
$max: "$time"
},
roots: {
$push: "$$ROOT"
}
}
},
{
$unwind: "$roots"
},
{
$replaceRoot: {
newRoot: {
"$mergeObjects": [
"$roots",
{
maxTime: "$maxTime"
}
]
}
}
},
... same pipeline ...
])

相关内容

最新更新