从MongoDB(tMongoDbInput)加载最近尚未加载的行的数据



背景:

我创建了一个从MongoDB读取数据并将其加载到MS-SQL的作业。

当前行为:

每当我运行作业时,它都会从MongoDB中获取所有数据。

预期行为:

当作业运行时,它应该只获取尚未加载的数据。我在mongoDB文档中有一个时间戳字段。

示例

时间戳:2022-07-29T08:14:14.657+00:00

解决方案1:

我试图将查询添加到mongo中,以便只加载最后15个薄荷糖。

但问题是,例如,我的作业组件会停机1小时。

当它再次出现时,在下一次作业运行中,它将只加载最后15分钟的数据,而我们丢失了45分钟的数据。。

所需解决方案:

如果作业第一次运行,则它将提取所有时间的数据并加载到SQL。

当作业下次运行时(比如说在15分钟之后(,它将自动假设这些是新创建的,并且只加载新行。

更新

现在我已经写了一篇关于这个解决方案的完整文章。https://medium.com/@raowaqasakram/从mongoodb-taled-1f21ba7b98b5 获取测试数据

最合理的解决方案是直接从SQL表中导出最后一个时间戳(假设此字段只增加(:

// get the last timestamp from SQL
const lastTimestamp = SQLClient("select timestamp from sqltable order by timestamp desc limit 1");
const documentsToExport = db.collection.find({ timestamp: { $gt: new Date(lastTimestamp) }});
... export logic ...

这样,即使当前作业在中间失败,您也总是上传最后一个文档,您还应该确保插入文档以支持这一点。


您还可以维护一些包含作业详细信息的元数据集合,例如(在nodejs语法中(:

// get the last job saved.
const lastJob = await db.jobCollection.findOne({}, { sort: { _id: -1 });
// if this is the first job timestamp will be 1970 otherwise use previous job timestamp.
const nextTimestamp = lastJob?.timestamp ?? new Date('1970');
const documentsToExport = await db.collection.find({ timestamp: {$gt: nextTimestamp }}).sort({timestamp: 1});
if (documentsToExport.length) {
... upload to sql ...
// insert the latest timestamp from mongo, you can add additional metadata fields here.
// like run date, time took, documents inserted etc.
await.jobCollection.insertOne({ 
timestamp: documentsToExport[documentsToExport.length - 1].timestamp,
documentsInserted: documentsToExport.length,
createdAt: new Date()
})
}

显然,这个过程的容错性较差,这就是为什么如果可能的话,我建议您使用第一个。


最后一个解决方案是在不同的答案中提供的,即在每个文档上添加一个字段,以表明它是否已上传,但对于更大的集合,它将需要额外的索引和模式更改,如果可能的话,我更愿意避免这一点,我觉得这里就是这样。

您可以考虑在MongoDB集合模型中添加一个新属性。例如,您可以添加名为viewed的新属性,默认情况下该属性为false

然后,您可以始终查询viewed属性设置为false的所有文档,并立即将这些文档的属性更新为true。这样,它们就不会在下一次调用中被提取。

你可以用update()方法这样做:

db.collection.update({
"viewed": false
},
{
"$set": {
"viewed": true
}
})

这样,运行下一个查询的时间就不重要了,因为查询将始终返回所有尚未查看的文档。

最新更新