文件监控系统响应式编程



我正在使用C#。我是响应式编程的新手。 使用响应式编程,我想创建一个文件夹监控系统,该系统将调用文件夹 A 是否包含任何文件,如果是,那么它将抓取该文件并处理它并将其移动到文件夹 B 中。 假设文件夹 A 首先是空的。用户将一些文件实时添加到文件夹 A 中。系统检测到新文件已添加,它将逐个或同时处理。 我无法理解我应该使用什么创建或间隔,之后我的处理代码将在哪里编写 请帮助我

这应该相当接近:

var query =
Observable
.Using(
() =>
{
var fsw = new FileSystemWatcher(@"C:A");
fsw.EnableRaisingEvents = true;
return fsw;
},
fsw => Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Created += h,
h => fsw.Created -= h))
.Delay(TimeSpan.FromSeconds(0.1));

query
.Subscribe(x => File.Move(x.EventArgs.FullPath, Path.Combine(@"C:B", x.EventArgs.Name)));

FileSystemWatcher具有相对较小的InternalBufferSize(默认为 8 KB,最大 64 KB),如果在短时间内发生突发文件系统更改,并且FileSystemWatcher的事件处理程序正在执行任何耗时的操作,则很容易超过该。文档给出了以下建议:

使事件处理代码尽可能简短。

超出缓冲区的后果是严重的:所有缓冲的通知都将丢失。在大多数情况下,这应该是非常不可取的,如果不是完全不可接受的话。因此,要避免在事件调用的同一线程上同步执行繁重的文件移动操作。实现所需异步的一种简单方法是在处理程序和订阅代码之间注入Delay。更复杂的方法是对传入通知进行排队,并按顺序或有限并发处理每个文件。Merge运算符既可用于排队,也可用于并发控制。下面是一个示例¹:

IObservable<Unit> query = Observable
.Using(() =>
{
var fsw = new FileSystemWatcher(@"C:A");
fsw.EnableRaisingEvents = true;
return fsw;
},
fsw => Observable.FromEventPattern<FileSystemEventHandler,
FileSystemEventArgs>(h => fsw.Created += h, h => fsw.Created -= h)
)
.Delay(TimeSpan.FromSeconds(0.1))
.Select(x => Observable.Defer(() => Observable.Start(() =>
{
File.Move(x.EventArgs.FullPath, Path.Combine(@"C:B", x.EventArgs.Name));
})))
.Merge(maxConcurrent: 2);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
Task<Unit> task = query.ToTask(cts.Token); // Start the file-watching

Observable.Defer+Observable.Start组合用作异步Observable.FromAsync的同步等效项(因为File.Move方法是同步的)。

¹它是Enigmativity示例的修改版本。

相关内容

  • 没有找到相关文章

最新更新