响应式扩展允许您轻松订阅使用Observable.FromEventPattern
的事件,但我找不到任何关于如何实现一个事件当你有一个IObservable
。
我的情况是:我需要实现一个包含事件的接口。该事件应该在我的对象的某个值发生变化时被调用,并且出于线程安全原因,我需要在某个SynchronizationContext
上调用此事件。我还应该调用每个事件处理程序与当前值注册。
public interface IFooWatcher
{
event FooChangedHandler FooChanged;
}
获得一个可观察的,做什么我想是相当容易与Rx使用BehaviorSubject
:
public class FooWatcher
{
private readonly BehaviorSubject<Foo> m_subject;
private readonly IObservable<Foo> m_observable;
public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
{
m_subject = new BehaviorSubject<Foo>(initialValue);
m_observable = m_subject
.DistinctUntilChanged()
.ObserveOn(synchronizationContext);
}
public event FooChangedHandler FooChanged
{
add { /* ??? */ }
remove { /* ??? */ }
}
}
现在我正在寻找一种简单的方法来让add
和remove
函数订阅和取消订阅传递的FooChangedHandler
作为m_observable
上的Observer<Foo>
。我当前的实现看起来像这样:
add
{
lock (m_lock)
{
IDisposable disp = m_observable.Subscribe(value);
m_registeredObservers.Add(
new KeyValuePair<FooChangedHandler, IDisposable>(
value, disp));
}
}
remove
{
lock (m_lock)
{
KeyValuePair<FooChangedHandler, IDisposable> observerDisposable =
m_registeredObservers
.First(pair => object.Equals(pair.Key, value));
m_registeredObservers.Remove(observerDisposable);
observerDisposable.Value.Dispose();
}
}
然而,我希望找到一个更简单的解决方案,因为我需要实现几个这样的事件(不同的处理程序类型)。我试着滚动我自己的通用解决方案,但它产生了一些需要解决的额外问题(特别是,你通常如何与一个接受T
参数的委托一起工作),所以我更愿意找到一个现有的解决方案,在这个方向上弥合差距-就像FromEventPattern
做相反的。
你可以这样做:
public event FooChangedHandler FooChanged
{
add { m_observable.ToEvent().OnNext += value; }
remove { m_observable.ToEvent().OnNext -= value; }
}
然而,在删除,我认为也许你只是想处理订阅…或者从ToEvent()获取Action并将其存储为成员。测试。
编辑:你将不得不使用Action而不是FooChangedHandler委托,然而。
编辑2:这是一个测试版本。我想你需要使用FooChangedHandler,然而,因为你有一堆这些预先存在的处理程序?void Main()
{
IObservable<Foo> foos = new [] { new Foo { X = 1 }, new Foo { X = 2 } }.ToObservable();
var watcher = new FooWatcher(SynchronizationContext.Current, new Foo { X = 12 });
watcher.FooChanged += o => o.X.Dump();
foos.Subscribe(watcher.Subject.OnNext);
}
// Define other methods and classes here
//public delegate void FooChangedHandler(Foo foo);
public interface IFooWatcher
{
event Action<Foo> FooChanged;
}
public class Foo {
public int X { get; set; }
}
public class FooWatcher
{
private readonly BehaviorSubject<Foo> m_subject;
public BehaviorSubject<Foo> Subject { get { return m_subject; } }
private readonly IObservable<Foo> m_observable;
public FooWatcher(SynchronizationContext synchronizationContext, Foo initialValue)
{
m_subject = new BehaviorSubject<Foo>(initialValue);
m_observable = m_subject
.DistinctUntilChanged();
}
public event Action<Foo> FooChanged
{
add { m_observable.ToEvent().OnNext += value; }
remove { m_observable.ToEvent().OnNext -= value; }
}
}
考虑到您已经混合了反应性和更正常的代码之间的界限,您可以做一个不那么反应性的版本。首先,简单地声明一个普通的事件模式
public event FooChangedHandler FooChanged;
protected void OnFooChanged(Foo)
{
var temp = FooChanged;
if (temp != null)
{
temp(new FooChangedEventArgs(Foo));
}
}
,然后在构造函数
中将可观察对象连接到它m_Observable.Subscribe(foo => OnFooChanged(foo));