并行更新Cosmos数据库中的数据



我在并行更新Cosmos DB中的数据时遇到问题。

更新Cosmos Db需要:

  • 从文档中获取数据
  • 更改数据
  • 保存更改的数据

并行更新是一个问题,因为第二个线程可以在第一个线程保存更改之前获取数据。然后,前三个线程的数据会丢失,因为它被第二个线程覆盖。

参见代码:

public async Task UpdateValue(Dto dto)
{
var details = new detail(dto.Description, dto.EmployeeName);
var id = Guid.Parse(dto.SyncId);
var organizationId = Guid.Parse(dto.OrganizationId);
var result = await _container
.GetItemLinqQueryable<Sync>()
.Where(x => x.OrganizationId == organizationId && x.Id == id)
.ToFeedIterator()
.GetOneAsync();
result.ChangeValues(details);
await _container
.ReplaceItemAsync(result, id.ToString(), new PartitionKey(organizationId.ToString()));
}

我已经用SemaphoreSlim成功地解决了这个问题。

private static readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);
public async Task UpdateValue(Dto dto)
{
var details = new detail(dto.Description, dto.EmployeeName);
var id = Guid.Parse(dto.SyncId);
var organizationId = Guid.Parse(dto.OrganizationId);
await _semaphoreSlim.WaitAsync();
try
{
var result = await _container
.GetItemLinqQueryable<Sync>()
.Where(x => x.OrganizationId == organizationId && x.Id == id)
.ToFeedIterator()
.GetOneAsync();
result.ChangeValues(details);
await _container
.ReplaceItemAsync(result, id.ToString(), new PartitionKey(organizationId.ToString()));
}
finally
{
_semaphoreSlim.Release();
}
}

现在只有一个线程可以同时更新数据。但这并不是最好的解决方案。我不希望在CosmosDb中有并行更新的问题。对于这种情况下的最佳做法,你有什么建议吗?

几个表现不同的选项

  1. 按1推送您的更新
  2. 让事情并行运行,实现乐观并发控制(OCC(,并在发生冲突时重试,有关OCC的更多信息可以在这里找到
  • 使用某种队列系统,如EventHubs等,如果需要,可以保证排序等
  • 最新更新