使用可观察的 SelectMany进行反应式扩展错误处理



我正在尝试使用反应式扩展库在某个文件夹上编写文件观察器 这个想法是监视硬盘驱动器文件夹中的新文件,等到文件完全写入并将事件推送到订阅者。我不想使用FileSystemWatcher因为它为同一文件引发了两次更改事件。

所以我以"反应方式"(我希望(写了它,如下所示:

var provider = new MessageProviderFake();
var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => provider.GetFiles());
using (source.Subscribe(_ => Console.WriteLine(_.Name), () => Console.WriteLine("completed to Console")))
{
Console.WriteLine("press Enter to stop");
Console.ReadLine();
}

但是我找不到"反应方式"来处理错误。例如,文件目录可能位于外部驱动器上,但由于连接问题而变得不可用。 因此,我添加了GetFilesSafe来处理来自反应式扩展的异常错误:

static IEnumerable<MessageArg> GetFilesSafe(IMessageProvider provider)
{
try
{
return provider.GetFiles();
}
catch (Exception e)
{
Console.WriteLine(e.Message);
return new MessageArg[0];
}
}

并像这样使用它

var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => GetFilesSafe(provider));

有没有更好的方法让SelectMany即使在引发异常时也可以调用provider.GetFiles()?在这种情况下,我使用错误计数器重复读取操作 N 次,然后失败(终止进程(。

反应式扩展中是否有"尝试 N 次并在尝试之间等待 Q 秒"?

GetFilesSafe也存在一个问题:它返回IEnumerable<MessageArg>用于延迟阅读,但它可以在迭代时引发,并且异常将在SelectMany的某个地方抛出

有一个Retry扩展,如果当前扩展出错,它只会再次订阅可观察量,但听起来这不会提供您想要的灵活性。

您可以使用Catch构建一些东西,如果外部发生错误,它会订阅您提供给它的可观察量。类似于以下内容(未经测试(:

IObservable<Thing> GetFilesObs(int times, bool delay) {
return Observable
.Return(0)
.Delay(TimeSpan.FromSeconds(delay ? <delay_time> : 0))
.SelectMany(_ => Observable.Defer(() => GetFilesErroringObservable()))
.Catch(Observable.Defer(() => GetFilesObs(times - 1, true)));
}
// call with:
GetFilesObs(<number_of_tries>, false);

如前所述,除了触发重试外,这不会对错误执行任何操作。特别是,当发生了足够的错误时,它将完成而不会出错,这可能不是您想要的。

相关内容

  • 没有找到相关文章

最新更新