我对rx有点不熟悉,目前我正在使用它来调度一个cron作业,该作业通过grpc连续轮询服务器(在Stackoverflow上找到了解决方案)。
rx的轮询函数:
Observable.Interval(Timespan)
.Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
.Concat()
.Catch((Exception ex) => OnErrorFunc(ex))
.Subscribe(m_CTS.Token);`
处理发生错误的error函数:
private IObservable<System.Reactive.Unit> OnError(Exception aException)
{
LOG.Error("CronJob failed " + Job.Method+ " " + aException.Message );
return Observable.Empty<System.Reactive.Unit>();
}
当任务失败时,例如服务器不可用,抛出异常。Rx用error函数捕获错误,但不继续调度/触发任务。
显然,空的可观察对象不足以让序列继续。
我已经尝试返回
Observable.FromAsync(() => Job(m_CTS.Token))
但是这个也不行
让序列运行的正确返回类型是什么?或许我的方法是错的?.Retry
是更好的选择吗?
为了消除这里的混乱,整个类:
public class CronJob : ICronJob
{
private CancellationTokenSource m_CTS;
private static readonly ILog LOG = LogManager.GetLogger( System.Reflection.MethodBase.GetCurrentMethod().DeclaringType );
public Func<CancellationToken,Task> Job{ get; private set; }
public Func<Exception,IObservable< System.Reactive.Unit>> OnErrorFunc { get; set; }
public TimeSpan Timespan;
public bool Running { get; private set; } = false;
public CronJob(Func<CancellationToken,Task> aJob, TimeSpan aTimeSpan, Func<Exception,IObservable<System.Reactive.Unit>> aOnErrorFunc = null)
{
Job= aJob;
Timespan = aTimeSpan;
OnErrorFunc = aOnErrorFunc ?? OnError;
m_CTS = new CancellationTokenSource();
}
public void StartInstantly()
{
StartTask();
Observable.Interval(Timespan)
.StartWith(-1L)
.Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
.Concat()
.Catch((Exception ex) => OnErrorFunc(ex))
.Subscribe(m_CTS.Token);
}
private void StartTask()
{
if (Running)
throw new Exception("Task allready started");
m_CTS = new CancellationTokenSource();
Running = true;
}
public void StartAfterTimeSpan()
{
StartTask();
Observable.Interval(Timespan)
.Select(l => Observable.FromAsync(() => Job(m_CTS.Token)))
.Concat()
.Catch((Exception ex) => OnErrorFunc(ex))
.Subscribe(m_CTS.Token);
}
public void Stop()
{
m_CTS.Cancel();
Running = false;
}
public void SetTask(Func<CancellationToken,Task> aJob)
{
if(Running)
Stop();
Job= aJob;
}
private IObservable<System.Reactive.Unit> OnError(Exception aException)
{
LOG.Error("CronJob failed " + Job.Method+ " " + aException.Message );
// return Observable.Empty<System.Reactive.Unit>();
return Observable.Empty<System.Reactive.Unit>();
}
}
Retry
结合Catch
更接近你所寻找的。
通常,异常会通过向订阅发送一个Error通知来终止可观察管道。Catch
捕获错误通知,并将其替换为Catch
参数中的新观察对象。在你的例子中,这是一个Empty
:所以可观察管道将正常终止(带有一个Completed通知),但它仍然会终止。
Retry
抑制错误通知并尝试重新订阅整个管道。像这样的东西可能是你想要的:
var retryLimit = 3;
Observable.Interval(Timespan)
.StartWith(-1L)
.Select(l => Observable.FromAsync(() => Task(m_CTS.Token)))
.Concat()
.Catch((Exception ex) => OnErrorFunc(ex))
.Retry(retryLimit) // will permanently die after 3 errors
.Subscribe(m_CTS.Token);
进一步阅读,我推荐这个文档:http://introtorx.com/Content/v1.0.10621.0/11_AdvancedErrorHandling.html#Catch
如果在@Enigmativity https://stackoverflow.com/a/70695753/11957956的回答中找到了我在SO上的一个老问题中缺少的内容
首先,我将public Func<CancellationToken,Task> Task
重命名为public Func<CancellationToken,Task> Job
以防止混淆。(我也编辑了原来的问题)
为了让它运行,我将Job包装成一个函数:
Func<long, Task> handler = async (i) =>
{
await Job(m_CTS.Token);
};
并像这样传递:
Observable
.Interval(Timespan)
.Select(i => Observable.FromAsync(() => handler(i))
.Catch<System.Reactive.Unit, Exception>(ex => OnErrorFunc(ex)))
.Concat()
.LastOrDefaultAsync()
.Subscribe(m_CTS.Token);
@Shlomo回答也会工作。哪一个是更好的解决方案,也许有更有经验的人可以插话,我会标记最好的答案。