我一直在考虑在一个可观察对象中包装一个文件监视器,以帮助处理事件,但我遇到了一些麻烦,不知道如何得到我想要的行为。文件监视器监视存放文件的目录。当文件首次放入该目录时,在文件监视程序上触发Created事件。但是,如果文件很大或网络连接较慢,则在文件更新时触发一系列Changed事件。我不想在文件写完之前处理它所以我真正需要的是这个时间轴
|Created |Changed |Changed |Changed
________________________________________________
^Write starts ^Write finishes ^Processing Starts
我查看了Rx中过滤事件的许多方法,但我无法得到我需要的是"一旦文件文件在X秒内没有更改就触发函数"。节流是不好的,因为它会失去事件在中间。缓冲区不好,因为事件可能发生在缓冲区边界上。
我曾考虑过使用超时,但我并不疯狂,因为它们抛出了一个异常,我希望处理在文件被写入时开始,而不是一次,根本没有更多的事件。
在响应式扩展vs文件系统观察器中也有一个类似的问题,这个问题从来没有真正解决过。
是否有一种方法可以让我轻松地做到这一点?我确信这不是一个罕见的用例。
ObservableFileSystemWatcher
-一个围绕FileSystemWatcher
类型的可观察包装器-工作完美。添加一个名为ReactiveFileSystemWatcher
的NuGet包,并创建一个控制台应用程序进行测试,如下所示
class Program
{
static void Main(string[] args)
{
using (var watcher = new ObservableFileSystemWatcher(c => { c.Path = @"C:FolderToWatch"; c.IncludeSubdirectories = true; }))
{
watcher.Created.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
watcher.Changed.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
watcher.Renamed.Select(x => $"{x.OldName} was {x.ChangeType} to {x.Name}").Subscribe(Console.WriteLine);
watcher.Deleted.Select(x => $"{x.Name} was {x.ChangeType}").Subscribe(Console.WriteLine);
watcher.Errors.Subscribe(Console.WriteLine);
watcher.Start();
Console.ReadLine();
}
}
}
编辑:经过审查,我想你不会想要这个…
也许我过于简化了一点,但
Throttle
不是理想的吗?
这绝不是"简单",但我认为它比我之前的想法更接近你想要的:
(附带一个测试用例!;)
void Main()
{
var pathToWatch = @"c:temp";
var fsw = new FileSystemWatcher(pathToWatch);
// set up observables for create and changed
var changedObs =
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
dlgt => fsw.Changed += dlgt,
dlgt => fsw.Changed -= dlgt);
var createdObs =
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>(
dlgt => fsw.Created += dlgt,
dlgt => fsw.Created -= dlgt);
// the longest we'll wait between last file write and calling it "changed"
var maximumTimeBetweenWrites = TimeSpan.FromSeconds(1);
// A "pulse" ticking off every 10ms (adjust this as desired)
var timer = Observable
.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(10))
.Select(i => DateTime.Now);
var watcher =
from creation in createdObs
from change in changedObs
// we only care about changes matching a create
.Where(changeEvt => changeEvt.EventArgs.Name == creation.EventArgs.Name)
// take latest of (pulse, changes) and select (event, time since last file write)
.CombineLatest(timer, (evt, now) => new {
Change = evt,
DeltaFromLast = now.Subtract(new FileInfo(evt.EventArgs.FullPath).LastWriteTime)})
// skip all until we trigger than "time before considered changed" threshold
.SkipWhile(evt => evt.DeltaFromLast < maximumTimeBetweenWrites)
// Then lock on that until we change a diff file
.Distinct(evt => evt.Change.EventArgs.FullPath)
select change.Change;
var disp = new CompositeDisposable();
// to show creates
disp.Add(
createdObs.Subscribe(
evt => Console.WriteLine("New file:{0}",
evt.EventArgs.FullPath)));
// to show "final changes"
disp.Add(
watcher.Subscribe(
evt => Console.WriteLine("{0}:{1}:{2}",
evt.EventArgs.Name,
evt.EventArgs.ChangeType,
evt.EventArgs.FullPath)));
fsw.EnableRaisingEvents = true;
var rnd = new Random();
Enumerable.Range(0,10)
.AsParallel()
.ForAll(i =>
{
var filename = Path.Combine(pathToWatch, "foo" + i + ".txt");
if(File.Exists(filename))
File.Delete(filename);
foreach(var j in Enumerable.Range(0, 20))
{
var writer = File.AppendText(filename);
writer.WriteLine(j);
writer.Close();
Thread.Sleep(rnd.Next(500));
}
});
Console.WriteLine("Press enter to quit...");
Console.ReadLine();
disp.Dispose();
}
看看我在这个答案中的BufferWithInactivity
扩展方法。
我想你可以用它来查找变化的事件中的不活动
查看NuGet包Reactive FileSystemWatcher
源代码在GitHub页面