捕获错误后,Rx .Net 不会继续序列



我对rx有点不熟悉,目前我正在使用它来调度一个cron作业,该作业通过grpc连续轮询服务器(在Stackoverflow上找到了解决方案)。

rx的轮询函数:

Observable.Interval(Timespan)
.Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
.Concat()
.Catch((Exception ex) => OnErrorFunc(ex))
.Subscribe(m_CTS.Token);`

处理发生错误的error函数:

private IObservable<System.Reactive.Unit> OnError(Exception aException)
{
LOG.Error("CronJob failed " + Job.Method+ " " + aException.Message );
return Observable.Empty<System.Reactive.Unit>();
}

当任务失败时,例如服务器不可用,抛出异常。Rx用error函数捕获错误,但不继续调度/触发任务。

显然,空的可观察对象不足以让序列继续。

我已经尝试返回

Observable.FromAsync(() => Job(m_CTS.Token)) 

但是这个也不行

让序列运行的正确返回类型是什么?或许我的方法是错的?.Retry是更好的选择吗?

为了消除这里的混乱,整个类:

public class CronJob : ICronJob
{
private CancellationTokenSource m_CTS;
private static readonly ILog LOG = LogManager.GetLogger( System.Reflection.MethodBase.GetCurrentMethod().DeclaringType );
public Func<CancellationToken,Task> Job{ get; private set; }
public Func<Exception,IObservable< System.Reactive.Unit>> OnErrorFunc { get; set; }
public TimeSpan Timespan;
public bool Running { get; private set; } = false;

public CronJob(Func<CancellationToken,Task> aJob, TimeSpan aTimeSpan, Func<Exception,IObservable<System.Reactive.Unit>> aOnErrorFunc = null)
{
Job= aJob;
Timespan = aTimeSpan;
OnErrorFunc = aOnErrorFunc ?? OnError;
m_CTS = new CancellationTokenSource();
}

public void StartInstantly()
{
StartTask();
Observable.Interval(Timespan)
.StartWith(-1L)
.Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
.Concat()
.Catch((Exception ex) => OnErrorFunc(ex))
.Subscribe(m_CTS.Token);
}
private void StartTask()
{
if (Running)
throw new Exception("Task allready started");
m_CTS = new CancellationTokenSource();
Running = true;
}
public void StartAfterTimeSpan()
{
StartTask();

Observable.Interval(Timespan)
.Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
.Concat()
.Catch((Exception ex) => OnErrorFunc(ex))
.Subscribe(m_CTS.Token);

}
public void Stop()
{
m_CTS.Cancel();
Running = false;
}
public void SetTask(Func<CancellationToken,Task> aJob)
{
if(Running)
Stop();
Job= aJob;
}
private IObservable<System.Reactive.Unit> OnError(Exception aException)
{
LOG.Error("CronJob failed " + Job.Method+ " " + aException.Message );
// return Observable.Empty<System.Reactive.Unit>();
return Observable.Empty<System.Reactive.Unit>();
}
}

Retry结合Catch更接近你所寻找的。

通常,异常会通过向订阅发送一个Error通知来终止可观察管道。Catch捕获错误通知,并将其替换为Catch参数中的新观察对象。在你的例子中,这是一个Empty:所以可观察管道将正常终止(带有一个Completed通知),但它仍然会终止。

Retry抑制错误通知并尝试重新订阅整个管道。像这样的东西可能是你想要的:

var retryLimit = 3;
Observable.Interval(Timespan)
.StartWith(-1L)
.Select(l => Observable.FromAsync(() => Task(m_CTS.Token)))
.Concat()
.Catch((Exception ex) => OnErrorFunc(ex))
.Retry(retryLimit) // will permanently die after 3 errors
.Subscribe(m_CTS.Token);

进一步阅读,我推荐这个文档:http://introtorx.com/Content/v1.0.10621.0/11_AdvancedErrorHandling.html#Catch

如果在@Enigmativity https://stackoverflow.com/a/70695753/11957956的回答中找到了我在SO上的一个老问题中缺少的内容

首先,我将public Func<CancellationToken,Task> Task重命名为public Func<CancellationToken,Task> Job以防止混淆。(我也编辑了原来的问题)

为了让它运行,我将Job包装成一个函数:

Func<long, Task> handler = async (i) =>
{
await Job(m_CTS.Token);
};

并像这样传递:

Observable
.Interval(Timespan)
.Select(i => Observable.FromAsync(() => handler(i))
.Catch<System.Reactive.Unit, Exception>(ex => OnErrorFunc(ex)))
.Concat()
.LastOrDefaultAsync()
.Subscribe(m_CTS.Token);

@Shlomo回答也会工作。哪一个是更好的解决方案,也许有更有经验的人可以插话,我会标记最好的答案。

相关内容

  • 没有找到相关文章

最新更新