我正在尝试使用反应式扩展库在某个文件夹上编写文件观察器 这个想法是监视硬盘驱动器文件夹中的新文件,等到文件完全写入并将事件推送到订阅者。我不想使用FileSystemWatcher
因为它为同一文件引发了两次更改事件。
所以我以"反应方式"(我希望(写了它,如下所示:
var provider = new MessageProviderFake();
var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => provider.GetFiles());
using (source.Subscribe(_ => Console.WriteLine(_.Name), () => Console.WriteLine("completed to Console")))
{
Console.WriteLine("press Enter to stop");
Console.ReadLine();
}
但是我找不到"反应方式"来处理错误。例如,文件目录可能位于外部驱动器上,但由于连接问题而变得不可用。 因此,我添加了GetFilesSafe
来处理来自反应式扩展的异常错误:
static IEnumerable<MessageArg> GetFilesSafe(IMessageProvider provider)
{
try
{
return provider.GetFiles();
}
catch (Exception e)
{
Console.WriteLine(e.Message);
return new MessageArg[0];
}
}
并像这样使用它
var source = Observable.Interval(TimeSpan.FromSeconds(2), NewThreadScheduler.Default).SelectMany(_ => GetFilesSafe(provider));
有没有更好的方法让SelectMany
即使在引发异常时也可以调用provider.GetFiles()
?在这种情况下,我使用错误计数器重复读取操作 N 次,然后失败(终止进程(。
反应式扩展中是否有"尝试 N 次并在尝试之间等待 Q 秒"?
GetFilesSafe
也存在一个问题:它返回IEnumerable<MessageArg>
用于延迟阅读,但它可以在迭代时引发,并且异常将在SelectMany
的某个地方抛出
有一个Retry
扩展,如果当前扩展出错,它只会再次订阅可观察量,但听起来这不会提供您想要的灵活性。
您可以使用Catch
构建一些东西,如果外部发生错误,它会订阅您提供给它的可观察量。类似于以下内容(未经测试(:
IObservable<Thing> GetFilesObs(int times, bool delay) {
return Observable
.Return(0)
.Delay(TimeSpan.FromSeconds(delay ? <delay_time> : 0))
.SelectMany(_ => Observable.Defer(() => GetFilesErroringObservable()))
.Catch(Observable.Defer(() => GetFilesObs(times - 1, true)));
}
// call with:
GetFilesObs(<number_of_tries>, false);
如前所述,除了触发重试外,这不会对错误执行任何操作。特别是,当发生了足够的错误时,它将完成而不会出错,这可能不是您想要的。