Polly Retry with RX Observable.Interval



我是 Polly 的新手,我正在尝试应用重试策略,以便我可以让它在出现 IBMMQ 连接问题时手动处理重试连接。

请考虑以下代码:

public class ReconnectException : Exception
{
}
public class QueueMonitor : IObservable<Message>, IDisposable
{
private readonly MQQueue mqQueue;
private readonly MQQueueManager queueManager;
private readonly string queueName;
private IDisposable timer;
private readonly object lockObj = new object();
private bool isChecking;
private readonly TimeSpan checkingFrequency;
private readonly List<IObserver<Message>> observers;
private TimeSpan reconnectInterval;
private readonly IScheduler scheduler;
private readonly int maxReconnectCount;
private static readonly ILog Logger = LogProvider.For<AonQueueManager>();

private readonly Policy pollyPolicy;
public QueueMonitor(IConfiguration configuration, string queueName, IScheduler scheduler = null)
{
this.queueManager = QueueFactory.GetIstance(configuration);
this.queueName = queueName;
this.scheduler = scheduler ?? Scheduler.Default;
checkingFrequency = configuration.GetValue("checkingFrequency", new TimeSpan(0, 0, 5));
reconnectInterval = configuration.GetValue("reconnectInterval", new TimeSpan(0, 0, 5));
maxReconnectCount = configuration.GetValue("maxReconnectCount", 3);
observers = new List<IObserver<Message>>();
pollyPolicy = Policy.Handle<ReconnectException>().WaitAndRetry(maxReconnectCount, _ => TimeSpan.FromSeconds(2));
mqQueue = queueManager.AccessQueue(queueName,
MQC.MQOO_INPUT_AS_Q_DEF // open queue for input
+ MQC.MQOO_FAIL_IF_QUIESCING); // but not if MQM stopping
}
public void Start()
{
var x = pollyPolicy.ExecuteAndCapture(CreateTimer);
}
private void CreateTimer()
{
Logger.DebugFormat("Repeating timer started, checking frequency: {checkingFrequency}", checkingFrequency);
timer = Observable.Interval(checkingFrequency, scheduler).Subscribe(_ =>
{
lock (lockObj)
{
if (isChecking) return;
Logger.Log(LogLevel.Debug, () => "Listening on queues for new messages");
isChecking = true;
var mqMsg = new MQMessage();
var mqGetMsgOpts = new MQGetMessageOptions { WaitInterval = checkingFrequency.Milliseconds };
// 15 second limit for waiting
mqGetMsgOpts.Options |= MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING |
MQC.MQCNO_RECONNECT_Q_MGR | MQC.MQOO_INPUT_AS_Q_DEF;
try
{
mqQueue.Get(mqMsg, mqGetMsgOpts);
if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
{
var text = mqMsg.ReadString(mqMsg.MessageLength);
Logger.Debug($"Message received : [{text}]");
Message message = new Message { Content = text };
foreach (var observer in observers)
observer.OnNext(message);
}
else
{
Logger.Warn("Non-text message");
}
}
catch (MQException ex)
{
if (ex.Message == MQC.MQRC_NO_MSG_AVAILABLE.ToString())
{
Logger.Trace("No messages available");
//nothing to do, emtpy queue
}
else if (ex.Message == MQC.MQRC_CONNECTION_BROKEN.ToString())
{
Logger.ErrorException("MQ Exception, trying to recconect", ex);
throw new ReconnectException();
}
}
finally
{
isChecking = false;
}
}
});
}

public IDisposable Subscribe(IObserver<Message> observer)
{
if (!observers.Contains(observer))
observers.Add(observer);
return new Unsubscriber(observers, observer);
}
public void Dispose()
{
((IDisposable)mqQueue)?.Dispose();
((IDisposable)queueManager)?.Dispose();
timer?.Dispose();
}
}
public class Unsubscriber : IDisposable
{
private readonly List<IObserver<Message>> observers;
private readonly IObserver<Message> observer;
public Unsubscriber(List<IObserver<Message>> observers, IObserver<Message> observer)
{
this.observers = observers;
this.observer = observer;
}
public void Dispose()
{
if (observer != null) observers.Remove(observer);
}
}

我遇到的问题是,当在 lamda (throw new ReconnectException();( 中抛出异常时,Polly 没有捕获它(我明白为什么,因为它在另一个线程上(,并且应用程序退出,因为它在不同的线程上。

这段代码是库的一部分,所以我不知道是否在每个项目中正确处理全局异常。

如何让它被 Polly 的代码"捕获"?

提前致谢

问题中发布的代码仅将策略应用于创建计时器的操作(执行CreateTimer()(,而不是计时器执行的代码(.(Subscribe(_ => { })调用中的 lambda(。

这与呼叫CreateTimer()try { } catch { }包围时的行为相同。catch将仅涵盖执行CreateTimer()方法的行为,即计时器的创建。


为了使 Polly 策略管理在 lambda 中抛出的异常,需要在 lambda 中将其应用于预期会引发异常的相关语句块/组。

例如,您可以编写以下代码:

pollyPolicy.ExecuteAndCapture(() => mqQueue.Get(mqMsg, mqGetMsgOpts));

(使用配置为管理要处理的特定MQException的策略(。

或者,您可以将策略应用于更广泛的语句组 - 就像使用try { }子句一样。

pollyPolicy.ExecuteAndCapture(() => 
{
// ...
mqQueue.Get(mqMsg, mqGetMsgOpts));
// ...
}

相关内容

  • 没有找到相关文章

最新更新