行为真的很奇怪,它看起来像是框架本身的一个巨大错误,
我有一个FileSystemWatcher来检测文件夹中的新文件,每次检测到新文件时,都会在发送Rx.OnNext通知之前解析并删除该文件:
private Subject<MyObject> objectNotification = new Subject<MyObject>();
private FileSystemWatcher watcher;
private void MyClassConstructor(string pathToWatch)
{
watcher= new FileSystemWatcher();
watcher.Filter = scraper.Ext;
watcher.Created += new FileSystemEventHandler(parseMethod);
watcher.Path = pathToWatch;
watcher.EnableRaisingEvents = true;
}
private void parseMethod(object sender, FileSystemEventArgs e)
{
MyObject parsedFile = new MyObject(e.FullPath);
File.Delete(e.FullPath);
var syncedSubject = Subject.Synchronize(objectNotification);
syncedSubject.OnNext(parsedFile);
}
另一方面,在主应用程序的GUI代码中,有一个接收代码:
private IDisposable myObjectSubscription;
.
.
.
private void initSubscription()
{
this.myObjectSubscription = sendingClass.Subscribe(parsedObject =>
{
this.AddObject(parsedObject);
});
}
private void addObject(MyObject parsedObject)
{
//Sometimes the same object is being received many times, but has been sent only once!
//BUG!
}
理想情况下,我应该在观察者中收到sender类发送的每个值的通知,不幸的是,我得到了更多,更多!
可观测同步是这个问题的核心问题。
存在Subject或Observable同步方法以防止并发OnXXX
调用。
由于同步在不必要时引入了开销(这通常足够了),并且由于下面描述的替代方法,因此Rx API的用户需要决定何时以及如何同步。
在您的情况下,每次引发事件并调用事件处理程序时,parseMethod
都会为该事件创建一个新的同步主题。这不是您想要的,也不会阻止并发的OnNext()
调用。因此,由于观察者OnNext()
处理程序中的并发调用,您将看到一个竞争条件错误。
与其在parseMethod
中创建一个同步主题,不如在构造函数中创建,或者在订阅上同步,它应该可以工作。
例如:
要么这样做:
private ISubject<MyObject,MyObject> objectNotification =
Subject.Synchronized(new Subject<MyObject>());
...
private void parseMethod(object sender, FileSystemEventArgs e)
{
MyObject parsedFile = new MyObject(e.FullPath);
File.Delete(e.FullPath);
objectNotification.OnNext(parsedFile);
}
或者这样做:
private void initSubscription()
{
this.myObjectSubscription = sendingClass.Synchronize().Subscribe(parsedObject =>
{
this.AddObject(parsedObject);
});
}
请注意,在存在多个订阅者的情况下,这些备选方案具有不同的行为
在第一种情况下,您要确保在所有观察员的飞行中最多有一个OnNext()
。第一个事件转到第一个订阅者,然后转到第二个。。。直到所有订阅者都接收到该事件,然后发送第二个事件等
在第二种情况中,您引入了一些(通常是安全的)并发性,现在您要确保在所有观察者中,对于特定事件,最多有一个OnNext()
在运行中。换句话说,在不同的观察者,可以对不同的事件//strong>进行并发OnNext()
调用,但任何时候都只有一个给定事件的[/strong>OnNext()
调用在运行中,并且不会对同一观察者进行并发调用。
你喜欢哪一种取决于你,后者允许观察员并行运行,但仍然可以防止你看到的问题;哪种方法更好取决于上面没有提供的实现细节。