我已经编写了一些代码,将FileSystemWatcher
的Changed
事件转换为可观察的序列。
我的目标是将所有文件系统更改拆分为单独的流,并对其进行节流。
例如,如果我有10个不同的文件,它们在半秒内更改了3次,那么每个文件只会收到一次通知。
不过,让我担心的是GroupBy()
运算符。为了实现这一点,(我认为)需要随着时间的推移不断构建团队,并消耗少量内存。
这会导致"泄漏"吗?如果会,我该如何防止?
FileSystemWatcher _watcher = new FileSystemWatcher("d:\") {
EnableRaisingEvents = true,
NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size
};
void Main()
{
var fileSystemEventStream =
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
(
_ => _watcher.Changed += _,
_ => _watcher.Changed -= _
)
.ObserveOn(ThreadPoolScheduler.Instance)
.SubscribeOn(ThreadPoolScheduler.Instance)
.GroupBy(ep => ep.EventArgs.FullPath, ep => ep.EventArgs.FullPath)
;
var res =
from fileGroup in fileSystemEventStream
from file in fileGroup.Throttle(TimeSpan.FromSeconds(1))
select file;
res.Subscribe(
ReceiveFsFullPath,
exception => {
Console.WriteLine ("Something went wrong - " + exception.Message + " " + exception.StackTrace);
});
Console.Read();
}
void ReceiveFsFullPath(string s){
Console.WriteLine ("Received file system event on thread " + Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(s);
}
是的,对于每个新键,GroupBy都会创建一个Subject,并维护这些主题的字典。您正在订阅其中的每一个。因此,这是一小块内存,它将随着时间的推移而增长,而不会释放旧条目。你真正需要的是在油门计时器到期时取下钥匙。我想不出有什么方法可以用内置的运营商做到这一点。所以你需要一个自定义运算符。这里有一个刺。
public IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan delay)
{
return Observable.Create(observer =>
{
var notifications = new Subject<IObservable<T>>();
var subscription = notifications.Merge().Subscribe(observer);
var d = new Dictionary<T, IObserver<T>>();
var gate = new object();
var sourceSubscription = new SingleAssignmentDisposable();
var subscriptions = new CompositeDisposable(subscription, sourceSubscription);
sourceSubscription.Disposable = source.Subscribe(value =>
{
IObserver<T> entry;
lock(gate)
{
if (d.TryGetValue(value, out entry))
{
entry.OnNext(value);
}
else
{
var s = new Subject<T>();
var o = s.Throttle(delay).FirstAsync().Do(() =>
{
lock(gate)
{
d.Remove(value);
}
});
notifications.OnNext(o);
d.Add(value, s);
s.OnNext(value);
}
}
}, observer.OnError, notifications.OnCompleted);
return subscriptions;
});
}
...
Observable.FromEventPattern(...)
.Select(e => e.EventArgs.FullPath)
.ThrottleDistinct(TimeSpan.FromSeconds(1))
.Subscribe(...);
根据Brandon的回复,受试者将成长,无法被回收*。我对内存泄漏的主要担忧是你没有捕获订阅!即
res.Subscribe(...
必须更换为
subscription = res.Subscribe(...
如果不捕获订阅,则永远无法处理订阅,因此永远无法释放事件处理程序,因此存在"内存泄漏"。显然,如果你不在某个地方真正处理订阅,这是没有用的。
*好吧,如果它们完成了,那么它们将被自动处理,所以这会起作用。当FileDeleted事件发生时,您可以完成一个序列吗?