我正在使用C#。我是响应式编程的新手。 使用响应式编程,我想创建一个文件夹监控系统,该系统将调用文件夹 A 是否包含任何文件,如果是,那么它将抓取该文件并处理它并将其移动到文件夹 B 中。 假设文件夹 A 首先是空的。用户将一些文件实时添加到文件夹 A 中。系统检测到新文件已添加,它将逐个或同时处理。 我无法理解我应该使用什么创建或间隔,之后我的处理代码将在哪里编写 请帮助我
这应该相当接近:
var query =
Observable
.Using(
() =>
{
var fsw = new FileSystemWatcher(@"C:A");
fsw.EnableRaisingEvents = true;
return fsw;
},
fsw => Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
h => fsw.Created += h,
h => fsw.Created -= h))
.Delay(TimeSpan.FromSeconds(0.1));
query
.Subscribe(x => File.Move(x.EventArgs.FullPath, Path.Combine(@"C:B", x.EventArgs.Name)));
FileSystemWatcher
具有相对较小的InternalBufferSize
(默认为 8 KB,最大 64 KB),如果在短时间内发生突发文件系统更改,并且FileSystemWatcher
的事件处理程序正在执行任何耗时的操作,则很容易超过该。文档给出了以下建议:
使事件处理代码尽可能简短。
超出缓冲区的后果是严重的:所有缓冲的通知都将丢失。在大多数情况下,这应该是非常不可取的,如果不是完全不可接受的话。因此,要避免在事件调用的同一线程上同步执行繁重的文件移动操作。实现所需异步的一种简单方法是在处理程序和订阅代码之间注入Delay
。更复杂的方法是对传入通知进行排队,并按顺序或有限并发处理每个文件。Merge
运算符既可用于排队,也可用于并发控制。下面是一个示例¹:
IObservable<Unit> query = Observable
.Using(() =>
{
var fsw = new FileSystemWatcher(@"C:A");
fsw.EnableRaisingEvents = true;
return fsw;
},
fsw => Observable.FromEventPattern<FileSystemEventHandler,
FileSystemEventArgs>(h => fsw.Created += h, h => fsw.Created -= h)
)
.Delay(TimeSpan.FromSeconds(0.1))
.Select(x => Observable.Defer(() => Observable.Start(() =>
{
File.Move(x.EventArgs.FullPath, Path.Combine(@"C:B", x.EventArgs.Name));
})))
.Merge(maxConcurrent: 2);
var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));
Task<Unit> task = query.ToTask(cts.Token); // Start the file-watching
Observable.Defer
+Observable.Start
组合用作异步Observable.FromAsync
的同步等效项(因为File.Move
方法是同步的)。
¹它是Enigmativity示例的修改版本。