我有一个集合,其中有一些重复的文档。例如:
第一份文件:
{
"_id" : ObjectId("56f3d7cc1de31cb20c08ae6b"),
"AddedDate" : ISODate("2016-05-01T00:00:00.000Z"),
"Place": "THISPLACE",
"PresentInDB" : [
{
"InDB" : ISODate("2016-05-01T00:00:00.000Z")
}
],
"Checked" : [],
"Link": "http://www.mylink.com/first/84358"
}
第二份文件:
{
"_id" : ObjectId("577740526c1e542904725238"),
"AddedDate" : ISODate("2016-05-02T00:00:00.000Z"),
"Place": "THISPLACE",
"PresentInDB" : [
{
"InDB" : ISODate("2016-05-02T00:00:00.000Z")
},
{
"InDB" : ISODate("2016-05-03T00:00:00.000Z")
}
],
"Checked" : [
{
"Done" : ISODate("2016-05-02T00:00:00.000Z")
},
],
"Link": "http://www.mylink.com/second/84358"
}
Link
字段包含两个文档中相同的编号序列,即84358
。
所以我想实现这些步骤:
- 循环遍历集合中的每个文档
- 匹配
Link
字段中每个文档的编号序列(即上面的84358
),如果中存在多个文档在Link
字段中具有该序列的集合。此外,如果两个文档中的Place
字段匹配: - 合并
PresentInDB
和Checked
字段->通过添加最新文档中的数组值(按AddedDate
中的日期)合并PresentInDB
和Checked
字段字段)转换为最旧的文档 - 删除最新的文档
我如何实现这样的查询?
在MongoDB 3.3.6版本中引入了一个$split
运算符,用于处理聚合框架(Jira)中的字符串。在此版本之前,您只能使用map/reduce解决方案来解决此问题。
MongoDB 3.3.6发布后:聚合框架解决方案
db.duplicatedCollection.aggregate(
[
{
$project: {
_id : 1,
AddedDate : 1,
Place : 1,
PresentInDB : 1,
Checked : 1,
Link : 1,
sequenceNumber: { $arrayElemAt: [ {$split: ["$Link", "/"]}, -1 ]},
}
},
{
$sort: { AddedDate: 1 }
},
{
$group: {
_id : {
sequenceNumber : "$sequenceNumber",
Place : "$Place"
},
id : { $first: "$_id"},
AddedDate: { $first: "$AddedDate" },
Place : { $first: "$Place" },
PresentInDB: {
$push: '$PresentInDB'
},
Checked: {
$push: '$Checked'
},
Link: { $first: "$Link"}
}
},
{
$unwind: "$PresentInDB"
},
{
$unwind: {
path : "$PresentInDB",
preserveNullAndEmptyArrays: true
}
},
{
$unwind: "$Checked"
},
{
$unwind: {
path : "$Checked",
preserveNullAndEmptyArrays: true
}
},
{
$group: {
_id : "$id",
AddedDate: { $first: "$AddedDate" },
Place : { $first: "$Place" },
PresentInDB : {
$addToSet: '$PresentInDB'
},
Checked : {
$addToSet: '$Checked'
},
Link: { $first: "$Link"}
}
},
{
$out: "duplicatedCollection"
}
]
);
MongoDB 3.3.6发布前:映射/减少解决方案
地图功能:
var mapFunction = function() {
var linkArray = this.Link.split("/");
var sequenceNumber = linkArray[linkArray.length - 1];
var keyDoc = {
place : this.Place,
sequenceNumber: sequenceNumber,
};
emit(keyDoc, this);
};
减少功能:
var reduceFunction = function(key, values) {
var reducedDoc = {};
reducedDoc._id = values[0]._id;
reducedDoc.AddedDate = values[0].AddedDate;
reducedDoc.Link = values[0].Link;
reducedDoc.PresentInDB = [];
reducedDoc.Checked = [];
var presentInDbMillisArray = [];
var checkedMillisArray = [];
values.forEach(function(doc) {
if (reducedDoc.AddedDate < doc.AddedDate) {
reducedDoc._id = doc._id;
reducedDoc.AddedDate = doc.AddedDate;
reducedDoc.Link = doc.Link;
}
// PresentInDB field merge
doc.PresentInDB.forEach(function(presentInDBElem) {
var millis = presentInDBElem.InDB.getTime();
if (!Array.contains(presentInDbMillisArray, millis)) {
reducedDoc.PresentInDB.push(presentInDBElem);
presentInDbMillisArray.push(millis);
}
});
// same here with Checked field
doc.Checked.forEach(function(checkedElem) {
var millis = checkedElem.Done.getTime();
if (!Array.contains(checkedMillisArray, millis)) {
reducedDoc.Checked.push(checkedElem);
checkedMillisArray.push(millis);
}
});
});
return reducedDoc;
};
映射/减少:
db.duplicatedCollection.mapReduce(
mapFunction,
reduceFunction,
{
"out": "duplicatedCollection"
}
);
从映射中展开值/减少返回的文档:
db.duplicatedCollection.find(
{
value : {
$exists: true
}
}
).forEach(function(doc) {
db.duplicatedCollection.insert(doc.value);
db.duplicatedCollection.remove({_id : doc._id});
});
您可以使用单个aggregation
查询来执行此操作:
db.device.aggregate([{
"$unwind": "$PresentInDB"
}, {
"$match": {
"Link": /84358/
}
}, {
"$sort": {
"AddedDate": 1
}
}, {
"$group": {
_id: 0,
PresentInDB: {
$addToSet: '$PresentInDB'
},
AddedDate: {
$first: "$AddedDate"
},
id: {
$first: "$_id"
},
Link: {
$first: "$Link"
}
}
}, {
$out: "documents"
}])
$unwind
您的阵列对其进行处理$match
您的id(此处包含84358)- 按升序排列的
$sort
$group
具有:- 一个
$addToSet
,将所有PresentInDB
合并为一个没有重复的单个阵列 - 为每个字段保留一个CCD_ 20。保留第一个意味着你只想要旧的,因为我们以前是按升序排序的
- 一个
$out
将在此处将结果保存到名为documents
的新集合中