如何在多任务环境中防止azure表中的重复插入



我有一个多任务应用程序,其中多个任务同时运行。每个任务检查recordId是否已经存在于azure表中。如果不是,就加。我的问题是,虽然我已经对recordId应用了检查,但仍然添加了重复的条目。

public async Task<bool> TryExecuteAsync(ServiceCommandMessage commandMessage, CancellationToken token, IProgress<string> progress)
        {
            token.ThrowIfCancellationRequested();
            var isSuccessful = true;
            return await System.Threading.Tasks.Task.Run(() =>
            {
                token.ThrowIfCancellationRequested();
                var watch = new Stopwatch();
                watch.Start();
                try
                {
                    StoreFourSqaureMetadata(id);
                }
                catch (Exception ex)
                {                    
                    isSuccessful = false;
                    throw ex;
                }
                watch.Stop();
                return isSuccessful;
            }, token);
        }
public static void StoreFourSqaureMetadata(string Id)
    {
        var noDataAvailable = "No data available".Trim();
        try
        {               
            var d = IsExist(Id); //Checking if Id already exist in Table
            if (d != null) return;
            //If not add to table
        }
    }

我认为您的问题的最佳解决方案有两个相当不言自明的部分:(1)在表中的适当列上创建唯一键;(2)在插入失败后捕获错误。

唯一键是真正重要的部分。这是确保这种事情不会发生的唯一方法,因为数据库是架构中唯一能够保证这种一致性的部分。

在可能出现问题的地方,我使用类似这样的模式。首先,我有一组帮助我重试的助手方法:

/// <summary>
/// Try a given async action 'n' times or until it succeeds.
/// </summary>
/// <param name="times">The number of times to retry the action</param>
/// <param name="action">The action to retry</param>
/// <param name="pauseInMilliseconds">The amount of time in milliseconds to pause between retries (defaults to 0)</param>
public async static Task<T> RetriesAsync<T>(this int times, Func<int, Task<T>> action, int pauseInMilliseconds)
{
    var attempt = 0;
    var result = default(T);
    while (attempt < times)
    {
        try
        {
            result = await action(attempt);
            break;
        }
        catch (Exception)
        {
            attempt++;
            if (attempt >= times)
            {
                throw;
            }
        }
        if (pauseInMilliseconds > 0)
        {
            await Task.Delay(pauseInMilliseconds);
        }
    }
    return result;
}

然后我有方法检查行是否存在;如果有,它就返回;如果没有,则插入并返回。它的工作原理是这样的:

private async Task<Customer> CreateOrGetCustomer(IEntities db, int customerId)
{
    var customer = await db.Customers.FirstOrDefaultAsync(x => x.CustomerId == customerId);
    if (customer == null)
    {
        customer = new Customer { CustomerId = customerId };
        db.Customers.Add(customer);
        await db.SaveChangesAsync();
    }
    return customer;
}

然后我用这样的重试调用该方法:

var customer = await 2.RetriesAsync(async x => CreateOrGetCustomer(db, customerId));

我相信还有更优雅的方法,但它是有效的——至少,如果你在你的表上配置了所有合适的唯一键,它是有效的。

认为这两个部分是相当不言自明的,但如果你需要更多的指导,或者如果它们不适合你,请告诉我。

这是一类常见的问题,称为竞态条件,它们可能特别麻烦,特别是当您处理数据库时。

当两个(或更多)线程试图同时添加相同的ID值时,问题就出现了。它们都检查数据库中的表,看看这个ID是否存在,都发现它不存在,然后都为它添加一条新记录。

有很多方法可以做到这一点:存储过程在检查和插入时锁定表,ID字段上的唯一键或索引在多次插入尝试时强制失败,单个线程负责插入,一个线程安全的插入ID集合,检查和插入到锁中,等等。选择哪种方法在很大程度上取决于应用程序的需求。

如果你不担心直接将数据放入数据库会有一些延迟,你可以在StoreFourSqaureMetadata方法中使用锁来确保在任何时候只有一个线程在更新数据库:

private static readonly object _lock = new object();
public static void StoreFourSqaureMetadata(string Id)
{
    var noDataAvailable = "No data available".Trim();
    lock(_lock)
    {
        try
        {               
            var d = IsExist(Id); //Checking if Id already exist in Table
            if (d != null) 
                return;
            //If not add to table
        }
        catch { }
    }
}

这将绝对防止两个线程同时尝试添加记录,代价是使所有操作排队并一次运行一个代码。它将防止线程对相同的数据进行多次插入,但最终会降低总体吞吐量。

如果没有更多关于你的具体问题的信息,我真的无法提出更具体的解决方案。例如,如果你总是获得新的id,而不必关心数据库中已经存在的内容,你可以在内存中维护一个列表,并且只锁定足够长的时间来检查和插入该列表中的条目……P

最新更新