需要一种内置的方式,在不改变现有回购的情况下,为Dapper添加死锁弹性



需要使所有现有的repo(大约30多个(容错死锁,并使用日志和等待方法从中恢复。

尝试成功:经过一些研究,我在下面使用Polly回答了一个自定义的SqlResiliencyPolicy,并根据项目进行了定制。

但是,我所寻求的:目前的方式(PFB回答(,要求我要么

  1. await _policy.ExecuteAsyncOR包装所有现有的DB调用
  2. 提供接受IAsyncPolicy参数的自定义重载。然后调用所需的方法。IDbConnection的扩展类型:

public static Task<T> GetAsync<T>(this IDbConnection connection, object primaryKey, IAsyncPolicy policy) =>return await _policy.ExecuteAsync(async () => GetAsync<T> (...));

无论哪种方式,我都需要更改我所有的30多个转发。但是,在dapper/其他一些方法中,有没有一种内置的方法,我们可以

"在启动时配置策略,并通过适配器变得有弹性(回退到其容错机制(类似于添加策略的http客户端恢复能力的方式当你注册一个客户时;

通过这个:将代码更改到最低限度,不需要接触转发,只需要启动。

我有一个下面的方法,需要改进一下

已经实施了第二种方法^^:这将要DI'ed的政策与现有回购脱钩。IDbConnection的扩展方法负责围绕现有方法包装策略。

public class SqlResiliencePolicyFactory
{
private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[] { 1205 });
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
public SqlResiliencePolicyFactory(ILogger logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
}
public IPolicyRegistry<string> GetSqlResiliencePolicies(int transientErrorRetries = 3)
{
return new PolicyRegistry
{
{ 
"DbDeadLockResilience", 
Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetry(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction)
},
{ 
"DbDeadLockResilienceAsync", 
Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetryAsync(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction)
}
};
}

private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
_logger.Log(
LogLevel.Warning,
exception,
@$"Transient DB Failure while executing query,
error number: {((SqlException)exception).Number};
reattempt number: {reattemptCount}");
}

启动中:

DapperExtensions.SetPolicies(new SqlResiliencePolicyFactory(_logger, _configuration)
.GetSqlResiliencePolicies());

在一个单独的类中创建扩展方法,以围绕回购的现有方法包装策略。扩展方法:

public static class DapperExtensions
{
private static Policy _policy = Policy.NoOp();
private static IAsyncPolicy _asyncPolicy = Policy.NoOpAsync();
public static void SetPolicies(IReadOnlyPolicyRegistry<string> readOnlyPolicyRegistry)
{
_policy = readOnlyPolicyRegistry.Get<Policy>("DbDeadLockResilience");
_asyncPolicy = readOnlyPolicyRegistry.Get<IAsyncPolicy>("DbDeadLockResilienceAsync");
}
public static T GetFirstWithRetry<T>(this IDbConnection connection,
string? sql = null, object? parameters = null, IDbTransaction? transaction = null) where T : class =>
_policy.Execute(() => connection.GetFirst<T>(sql, parameters, transaction));
public static T QueryFirstOrDefaultWithRetry<T>(this IDbConnection connection, string sql,
object? parameters = null, IDbTransaction? transaction = null) =>
_policy.Execute(() => connection.QueryFirstOrDefault<T>(sql, parameters, transaction));
public static async Task<bool> UpdateAsyncWithRetry<T>(this IDbConnection connection, T entityToUpdate, IEnumerable<string> columnsToUpdate,
IDbTransaction? transaction = null) where T : class =>
await _asyncPolicy.ExecuteAsync(async () => await connection.UpdateAsync(entityToUpdate, columnsToUpdate, transaction));
//Similarly, add overloads to all the other methods in existing repo.
}

现在,

  1. 现有回购独立于政策(回购无DI(
  2. 政策在SRP之后单独保存
  3. Dapper扩展可以更改策略以便于测试

因此,现有的repo必须更改名称并调用上面的包装器,而不是调用dapper方法本身,将应用策略。不要忘记对回购进行一次回归测试。

到目前为止,以下将是对现有回购进行最小/无更改的合适方法。感谢@Sergey Akopov写的博客和我的同事,他们指出了这个博客。

简短回答:使用Decorator模式包装SQL Client's Connection and Command instances,并将Polly的重试策略注入这些Decorator。通过此操作,将能够使用重试策略包装所有SQL执行端点。这将与Dapper兼容,因为它是IDbConnection的扩展。

创建一个DI'able Retry策略,将策略封装在其中。此外,我们可以完全解耦策略以分离类并为DI注册它(本答案中没有显示,但其他答案中也遵循了这一点,如果您有多个策略,请不要忘记使用PolicyRegister(。

Git回购:https://github.com/VinZCodz/SqlTransientFaultHandling

详细信息

策略的接口,没有异步方法,因为Microsoft.Data.SqlClient端点不是异步的。

public interface IRetryPolicy
{
void Execute(Action operation);
TResult Execute<TResult>(Func<TResult> operation);
}

具体的实现,它将策略嵌入其中,并通过Sql客户端(即Dapper(为所有DB调用封装重试逻辑。

public class RetryPolicy : IRetryPolicy
{
private readonly ILogger<RetryPolicy> _logger;
private readonly Policy _retryPolicy;
private readonly ISet<int> _transientDbErrors = new HashSet<int>(new[] { 1205 });
private const int _transientErrorRetries = 3;
public RetryPolicy(ILogger<RetryPolicy> logger)
{
_logger = logger;
_retryPolicy = Policy
.Handle<SqlException>(ex => _transientDbErrors.Contains(ex.Number))
.WaitAndRetry(
retryCount: _transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
onRetry: LogRetryAction);
}
public void Execute(Action operation) => _retryPolicy.Execute(operation.Invoke);
public TResult Execute<TResult>(Func<TResult> operation) => _retryPolicy.Execute(() => operation.Invoke());
private void LogRetryAction(Exception exception, TimeSpan sleepTime, int reattemptCount, Context context) =>
_logger.LogWarning(
exception,
$"Transient DB Failure while executing query, error number: {((SqlException)exception).Number}; reattempt number: {reattemptCount}");
}

现在,不知何故,我们需要将此策略注入SqlClient的con和cmd,需要一个sealed类,其中'is-a' DbConnection(DAL端点将保持不变(和'has-a' DbConnection(模拟操作但重试(:

public sealed class ReliableSqlDbConnection : DbConnection
{
private readonly SqlConnection _underlyingConnection;
private readonly IRetryPolicy _retryPolicy;
private bool _disposedValue;
private string _connectionString;
public ReliableSqlDbConnection(string connectionString, IRetryPolicy retryPolicy)
{
_connectionString = connectionString;
_retryPolicy = retryPolicy;
_underlyingConnection = new SqlConnection(connectionString);
}
public override string ConnectionString
{
get => _connectionString;
set => _underlyingConnection.ConnectionString = _connectionString = value;
}
public override void Open()
{
_retryPolicy.Execute(() =>
{
if (_underlyingConnection.State != ConnectionState.Open)
{
_underlyingConnection.Open();
}
});
}
public override string Database => _underlyingConnection.Database;
public override string DataSource => _underlyingConnection.DataSource;
public override string ServerVersion => _underlyingConnection.ServerVersion;
public override ConnectionState State => _underlyingConnection.State;
public override void ChangeDatabase(string databaseName) => _underlyingConnection.ChangeDatabase(databaseName);
public override void Close() => _underlyingConnection.Close();
protected override DbTransaction BeginDbTransaction(IsolationLevel isolationLevel) => _underlyingConnection.BeginTransaction(isolationLevel);
protected override DbCommand CreateDbCommand() => new ReliableSqlDbCommand(_underlyingConnection.CreateCommand(), _retryPolicy);
} 

由于,无论何时需要,我们都会实例化SqlConnection,我们还需要按照微软建议的derived type dispose pattern正确处理它:

protected override void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
if (_underlyingConnection.State == ConnectionState.Open)
{
_underlyingConnection.Close();
}
_underlyingConnection.Dispose();
}
_disposedValue = true;
}
base.Dispose(disposing);
}

遵循与DbCommand类似的方法:

public sealed class ReliableSqlDbCommand : DbCommand
{
private readonly SqlCommand _underlyingSqlCommand;
private readonly IRetryPolicy _retryPolicy;
private bool _disposedValue;
public ReliableSqlDbCommand(SqlCommand command, IRetryPolicy retryPolicy)
{
_underlyingSqlCommand = command;
_retryPolicy = retryPolicy;
}
public override string CommandText
{
get => _underlyingSqlCommand.CommandText;
set => _underlyingSqlCommand.CommandText = value;
}
public override int CommandTimeout
{
get => _underlyingSqlCommand.CommandTimeout;
set => _underlyingSqlCommand.CommandTimeout = value;
}
public override CommandType CommandType
{
get => _underlyingSqlCommand.CommandType;
set => _underlyingSqlCommand.CommandType = value;
}
public override bool DesignTimeVisible
{
get => _underlyingSqlCommand.DesignTimeVisible;
set => _underlyingSqlCommand.DesignTimeVisible = value;
}
public override UpdateRowSource UpdatedRowSource
{
get => _underlyingSqlCommand.UpdatedRowSource;
set => _underlyingSqlCommand.UpdatedRowSource = value;
}
protected override DbConnection DbConnection
{
get => _underlyingSqlCommand.Connection;
set => _underlyingSqlCommand.Connection = (SqlConnection)value;
}
protected override DbParameterCollection DbParameterCollection => _underlyingSqlCommand.Parameters;
protected override DbTransaction DbTransaction
{
get => _underlyingSqlCommand.Transaction;
set => _underlyingSqlCommand.Transaction = (SqlTransaction)value;
}
public override void Cancel() => _underlyingSqlCommand.Cancel();
public override int ExecuteNonQuery() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteNonQuery());
public override object ExecuteScalar() => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteScalar());
public override void Prepare() => _retryPolicy.Execute(() => _underlyingSqlCommand.Prepare());
protected override DbParameter CreateDbParameter() => _underlyingSqlCommand.CreateParameter();
protected override DbDataReader ExecuteDbDataReader(CommandBehavior behavior) => _retryPolicy.Execute(() => _underlyingSqlCommand.ExecuteReader(behavior));
protected override void Dispose(bool disposing)
{
if (!_disposedValue)
{
if (disposing)
{
_underlyingSqlCommand.Dispose();
}
_disposedValue = true;
}
base.Dispose(disposing);
}
}

现有DAL侧:

DI:

services.AddScoped<IRetryPolicy, RetryPolicy>();
services.Configure<DbConnectionOption>(options =>
{
options.ConnectionString = connectionString;
});

懒惰加载装饰:

_connection = new Lazy<IDbConnection>(() =>
{
return new ReliableSqlDbConnection(_dbOptions.ConnectionString, _retryPolicy);
});

Xuit测试:这个测试实际上在单个会话上创建了一个死锁并将其重定时

感谢@Martin-Smith的精彩脚本,更多关于脚本的内容:使用单客户端和单会话模拟SQL服务器上的死锁

[Fact]
public void It_creates_reliablesqldbConnection_and_deadlock_itself_to_log_and_retry()
{
var logger = new FakeLogger<RetryPolicy>(); //create your own logger.
using var reliableSqlDbConnection = new ReliableSqlDbConnection(_fixture.Configuration["ConnectionStrings:DataContext"],
new RetryPolicy(logger)); //create your own fixture.
//Awesome script which deadlocks itself on single con and process with it's meta data.                                                              
Assert.ThrowsAsync<SqlException>(() => reliableSqlDbConnection.ExecuteAsync(
@"BEGIN TRAN
CREATE TYPE dbo.OptionIDs AS TABLE( OptionID INT PRIMARY KEY )
EXEC ('DECLARE @OptionIDs dbo.OptionIDs;')
ROLLBACK "));

Assert.Equal(LogLevel.Warning, logger.Logs.Select(g => g.Key).First());
var retries = logger.Logs[LogLevel.Warning].First();
Assert.Equal(3, retries.Count());
Assert.Equal("Transient DB Failure while executing query, error number: 1205; reattempt number: 1", retries.First());
}

摘要:这样,OpenConnection、ExecuteReaderExecuteScalarExecuteNonQuery等将具有重试功能,最终将由所有Dapper端点调用。

这样,将代码更改到最低限度,不需要接触转发,只需启动即可。只需为SqlClient的连接和命令提供一个包装器/装饰器,就可以使用自定义策略进行注入和重试。

经过一些研究后,我的方法是:

public class SqlResiliencyPolicy 
{ 
private readonly ISet<int> transientDbErrors = new HashSet<int>(new[] { 1205 });
private readonly ILogger _logger;
private readonly IConfiguration _configuration;
public SqlResiliencyPolicy(ILogger logger, IConfiguration configuration)
{
_logger = logger;
_configuration = configuration;
}
public IAsyncPolicy GetSqlResiliencyPolicy(int transientErrorRetries = 3)
{
return Policy
.Handle<SqlException>(ex => transientDbErrors.Contains(ex.Number))
.WaitAndRetryAsync(
retryCount: transientErrorRetries,
sleepDurationProvider: attempt => TimeSpan.FromMilliseconds(attempt * 100),
(exception, sleepTime, reattempt, context) =>
{
_logger.Log(LogLevel.Error, exception, $"Transient DB Failure while executing query, error number: {((SqlException)exception).Number}; reattempt number: {reattempt}");
});
}
}

启动中:

services.AddScoped(_ => new SqlResiliencyPolicy(_logger, _configuration).GetSqlResiliencyPolicy());

Ctor DI:在现有的Repos DI到Ctor中,带有私有IAsyncPolicy支持字段:

private readonly IAsyncPolicy _policy;

最后一步:用包裹所有适配器调用

await _policy.ExecuteAsync(async () => {<existing DB call>});

最新更新