我正在使用RavenDB来保存数千个文档。数据来自每日 xml 源,我将通过运行 C# 控制台应用来更新该源。下面是处理源以使数据库与任何更改保持同步的代码。我在这方面遇到了很多问题,所以我想知道我是否选择了错误的策略。以下是一些需要注意的重要事项。
- 新商品可能已添加到 Feed 中,现有商品可能已经更改,因此每次运行时我都想添加或更新一个文档取决于它是否是新的。
- xml 提要不包含对我的 RavenDB ID 的任何引用,只包含每个项目的内部密钥。因此,当检索现有的要更新的文档我只能通过检查"源ID"来做到这一点属性。
- 我使用"take"一次只能处理 500 个文档,部分原因是我的数据库仅限于 1000 个文档,部分原因是没有Take() 我似乎只能检索 128 个文档。
- 就目前而言,这段代码会因"在一个会话中不能进行超过 30 次更新"错误而失败,我认为是因为每次我尝试从它实际命中的数据库项中检索现有记录再次数据库。
- 我可以通过对项目调用 ToList() 来解决上述 (4) 中的问题,但如果我这样做,当我调用时现有项目不会更新会期。SaveChanges()(我把它想象成一个断开连接的人记录集)。
谁能给我一些指示?
public void ProcessFeed(string rawXml)
{
XDocument doc = XDocument.Parse(rawXml);
var items = ExtractItemsFromFeed(doc).OrderBy(x => x.SourceId).Take(500);
using (IDocumentSession session = _store.OpenSession())
{
var dbItems = session.Query<AccItem>().OrderBy(x => x.SourceId).Take(500);
foreach (var item in items)
{
var existingRecord = dbItems.SingleOrDefault(x => x.SourceId == item.SourceId);
if (existingRecord == null)
{
session.Store(item);
_logger.Info("Saved new item {0}.", item.ShortName);
}
else
{
// update just one field for now
existingRecord.Village = item.Village;
_logger.Info("Updated item {0}.", item.ShortName);
}
}
session.SaveChanges();
}
}
以下是我最终得到的代码。我认为原始版本的最初问题只是我试图对每个项目使用相同的会话,打破了 30 的限制。
在TekPub 截屏视频中,屏幕上的一些代码提示,我通过将整个过程批处理为 15 个集合来解决此问题(允许一次读取和一次写入,因此每批总共 30 个请求)。这很慢,但并不像我预期的那么慢。我预计一次可能有 10,000 条记录,所以我会让它滴答作响,直到它完成。
public void ProcessFeed(string rawXml)
{
XDocument doc = XDocument.Parse(rawXml);
var items = ExtractItemsFromFeed(doc).OrderBy(x => x.SourceId);
int numberOfItems = items.Count;
int batchSize = 15;
int numberOfBatchesRequired = numberOfItems / batchSize;
int numberOfBatchesProcessed = 0;
int numberOfItemsInLastBatch = numberOfItems - (numberOfBatchesRequired * batchSize);
for (var i = 0;i <= numberOfBatchesRequired;i++)
{
using (IDocumentSession session = _store.OpenSession())
{
var numberOfItemsProcessedSoFar = numberOfBatchesProcessed * batchSize;
var numberOfItemsRemaining = numberOfItems - numberOfItemsProcessedSoFar;
int itemsToTake = 15;
if (numberOfItemsRemaining > 0 && numberOfItemsRemaining < 15)
itemsToTake = numberOfItemsRemaining;
foreach (var item in items.Skip(numberOfItemsProcessedSoFar).Take(itemsToTake))
{
var existingRecords = session.Query<AccItem>().Where(x => x.SourceId == item.SourceId).ToList();
if (!existingRecords.Any())
{
session.Store(item);
_logger.Info("Saved new item {0}.", item.ShortName);
}
else
{
if (existingRecords.Count() > 1)
_logger.Warn("There's more than one item in the database with the sourceid {0}", item.SourceId);
existingRecords.First().Village = item.Village;
_logger.Info("Updated item {0}.", item.ShortName);
}
session.SaveChanges();
}
}
numberOfBatchesProcessed++;
}
}