Azure CosmosDB:提高批量插入性能



我正在使用Azure Cosmos DB SDK(3.0(进行CRUD操作。当我试图同时插入8000-10000条记录时,这几乎需要3-4分钟。

这是我的代码:

public async Task<ResultDto> HandleAsync(EnableOrDisableSubscriptionCommand command, ILogger logger)
{
logger.Info("Started EnableOrDisableSubscriptionCommand ", nameof(EnableOrDisableSubscriptionCommand));

if (command.UiNotifications.Any())
{
await AddSubscription(command, SubscriptionAction.UiNotification, command.UiNotifications);
logger.Info("Added UI notification subscriptions");
}
if (command.EmailNotifications.Any())
{
await AddSubscription(command, SubscriptionAction.Email, command.EmailNotifications);
logger.Info("Added Email notification subscriptions");
}
return new ResultDto { ResultType = ResultType.Success, Message = $"User {command.UserId} SubscriptionStatus" };
}

private async Task AddSubscription(EnableOrDisableSubscriptionCommand command, SubscriptionAction subscriptionAction, IList<int> notificationCategoryTypes)
{
foreach (var notificationCategory in notificationCategoryTypes)
{
var notificationTypes = Utility.GetNotificationTypes((NotificationCategoryType)notificationCategory);
foreach (var notificationType in notificationTypes)
{
foreach (var payerAccountSubscriptions in command.Subscriptions)
{
if (payerAccountSubscriptions.AccountNumbers?.Any() ?? false)
{
foreach (var accountNumber in payerAccountSubscriptions.AccountNumbers.Where(a => !string.IsNullOrEmpty(a)))
{
await _repository.Create(subscriptionAction, notificationType,
payerAccountSubscriptions.ColCoId, payerAccountSubscriptions.PayerNumber, accountNumber, command.UserRole,
command.UserId);
}
}
else
{
await _repository.Create(subscriptionAction, notificationType,
payerAccountSubscriptions.ColCoId, payerAccountSubscriptions.PayerNumber, null, command.UserRole,
command.UserId);
}
}
}
}
}

订阅存储库创建方法:

public async Task Create(SubscriptionAction subscriptionAction, NotificationType notificationType,
int colCoId, string payerNumber, string accountNumber, UserRole userRole, string userId, string cardId = null)
{
var eventType = Utility.GetEventType(notificationType);
var subscriptionBase = new Subscription
{
Id = Guid.NewGuid(),
IsActive = true,
Action = subscriptionAction,
ActionDesc = subscriptionAction.ToString(),
Version = (int)SubscriptionVersion.V2,
NotificationType = notificationType,
NotificationTypeDesc = notificationType.ToString(),
EventType = eventType,
EventTypeDesc = eventType.ToString(),
ColCoId = colCoId,
PayerNumber = payerNumber,
AccountNumber = accountNumber,
CardId = cardId,
DistributionGroups = new List<string> { userRole.ToString() },
DistributionUserIds = new List<string> { userId }
};
await CreateItemAsync(subscriptionBase);
}

通用存储库:

public async Task<ItemResponse<T>> CreateItemAsync(T item)
{
return await _container.CreateItemAsync<T>(item);
}

由于此问题,我的Http触发器Azure函数返回System.OutOfMemoryException。

我该如何改进?

您可以通过在客户端中设置AllowBulkExecution = true并将每个插入操作添加到要执行的任务中来改进这一点。

你可以在这里了解更多并看到一个例子

编辑:(太长,无法添加为注释(您需要多少RU/s取决于许多因素,包括您希望摄取数据的速度。我会衡量插入其中一个项目的成本,然后将您的供应吞吐量除以插入一个项目所需的金额。结果应该是每秒可以插入的项目数(假设您没有做其他事情(。如果你有一个插入成本为10 RU/s的物品,并且你有3000 RU/s的供应,你每秒可以摄入300个物品。对于10000个项目/每秒300=33秒。

因此,如果这需要3-4分钟,那么您的代码就有其他问题。我会回过头来阅读我在上面发布的文章,因为我没有看到您实现我们建议的模式,特别是将每个操作放在List对象上,然后调用await Task.WhenAll(this.Tasks);

我看到的另一个问题是,您没有在InsertItemssync((调用中指定分区键。这将把所有内容写入一个空分区,并且一旦大小达到20GB,最终将停止接受任何新的写入。

最新更新