创建对IObservable的弱订阅



我想做的是确保如果对我的观察者的唯一引用是可观察对象,它会被垃圾收集并停止接收消息。

假设我有一个控件,上面有一个名为Messages的列表框,后面的代码是:

//Short lived display of messages (only while the user's viewing incoming messages)
public partial class MessageDisplay : UserControl
{
    public MessageDisplay()
    {
        InitializeComponent();
        MySource.IncomingMessages.Subscribe(m => Messages.Items.Add(m));
    }
}

连接到这个源:

//Long lived location for message store
static class MySource
{
    public readonly static IObservable<string> IncomingMessages = new ReplaySubject<string>;
}

我不想要的是消息显示被保存在内存中很长时间后,它不再可见。理想情况下,我想要一个小扩展,所以我可以写:

MySource.IncomingMessages.ToWeakObservable().Subscribe(m => Messages.Items.Add(m));

我也不想依赖于MessageDisplay是一个用户控件的事实,因为我以后会想去一个MVVM设置与MessageDisplayViewModel这将不是一个用户控件。

你可以订阅一个代理观察者到持有对实际观察者的弱引用的可观察对象,并在实际观察者不再存活时处置订阅:

static IDisposable WeakSubscribe<T>(
    this IObservable<T> observable, IObserver<T> observer)
{
    return new WeakSubscription<T>(observable, observer);
}
class WeakSubscription<T> : IDisposable, IObserver<T>
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;
    public WeakSubscription(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }
    void IObserver<T>.OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }
    void IObserver<T>.OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }
    void IObserver<T>.OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }
    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}

几年后我看到了这个帖子…只是想指出Samuel Jack的博客中提出的解决方案,该解决方案为IObservable添加了一个名为WeaklySubscribe的扩展方法。它使用一种方法,在主体和观察者之间添加一个shim,用WeakReference跟踪目标。这类似于其他人针对事件订阅中的强引用问题提供的解决方案,例如本文或Paul Stovell的解决方案。有一段时间,我使用了一些基于Paul的方法,我喜欢Samuel对弱IObservable订阅的解决方案。

还有另一个选择,使用weak-event-patterns

基本上System.Windows.WeakEventManager已经覆盖了你。

使用MVVM,当你的ViewModel依赖于带有事件的服务时,你可以弱订阅这些服务,允许你的ViewModel与视图一起收集,而不需要事件订阅使其存活。

using System;
using System.Windows;
class LongLivingSubject
{ 
    public event EventHandler<EventArgs> Notifications = delegate { }; 
}
class ShortLivingObserver
{
    public ShortLivingObserver(LongLivingSubject subject)
    { 
        WeakEventManager<LongLivingSubject, EventArgs>
            .AddHandler(subject, nameof(subject.Notifications), Subject_Notifications); 
    }
    private void Subject_Notifications(object sender, EventArgs e) 
    { 
    }
}

这是我的实现(很简单的一个)

public class WeakObservable<T>: IObservable<T>
{
    private IObservable<T> _source;
    public WeakObservable(IObservable<T> source)
    {
        #region Validation
        if (source == null)
            throw new ArgumentNullException("source");
        #endregion Validation
        _source = source;
    }
    public IDisposable Subscribe(IObserver<T> observer)
    {
        IObservable<T> source = _source;
        if(source == null)
            return Disposable.Empty;
        var weakObserver = new WaekObserver<T>(observer);
        IDisposable disp = source.Subscribe(weakObserver);
        return disp;
    }
}
    public class WaekObserver<T>: IObserver<T>
{
    private WeakReference<IObserver<T>> _target;
    public WaekObserver(IObserver<T> target)
    {
        #region Validation
        if (target == null)
            throw new ArgumentNullException("target");
        #endregion Validation
        _target = new WeakReference<IObserver<T>>(target);
    }
    private IObserver<T> Target
    {
        get
        {
            IObserver<T> target;
            if(_target.TryGetTarget(out target))
                return target;
            return null;
        }
    }
    #region IObserver<T> Members
    /// <summary>
    /// Notifies the observer that the provider has finished sending push-based notifications.
    /// </summary>
    public void OnCompleted()
    {
        IObserver<T> target = Target;
        if (target == null)
            return;
        target.OnCompleted();
    }
    /// <summary>
    /// Notifies the observer that the provider has experienced an error condition.
    /// </summary>
    /// <param name="error">An object that provides additional information about the error.</param>
    public void OnError(Exception error)
    {
        IObserver<T> target = Target;
        if (target == null)
            return;
        target.OnError(error);
    }
    /// <summary>
    /// Provides the observer with new data.
    /// </summary>
    /// <param name="value">The current notification information.</param>
    public void OnNext(T value)
    {
        IObserver<T> target = Target;
        if (target == null)
            return;
        target.OnNext(value);
    }
    #endregion IObserver<T> Members
}
    public static class RxExtensions
{
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> source)
    {
        return new WeakObservable<T>(source);
    }
}
        static void Main(string[] args)
    {
        Console.WriteLine("Start");
        var xs = Observable.Interval(TimeSpan.FromSeconds(1));
        Sbscribe(xs);
        Thread.Sleep(2020);
        Console.WriteLine("Collect");
        GC.Collect();
        GC.WaitForPendingFinalizers();
        GC.Collect();
        Console.WriteLine("Done");
        Console.ReadKey();
    }
    private static void Sbscribe<T>(IObservable<T> source)
    {
        source.ToWeakObservable().Subscribe(v => Console.WriteLine(v));
    }

关键是要认识到必须同时传入目标和双参数操作。单参数操作永远不会做到这一点,因为您要么对操作使用弱引用(并且该操作会被GC),要么对操作使用强引用,而强引用又对目标具有强引用,因此目标无法被GC。记住这一点,下面的工作:

using System;
namespace Closures {
  public static class WeakReferenceExtensions {
    /// <summary> returns null if target is not available. Safe to call, even if the reference is null. </summary>
    public static TTarget TryGetTarget<TTarget>(this WeakReference<TTarget> reference) where TTarget : class {
      TTarget r = null;
      if (reference != null) {
        reference.TryGetTarget(out r);
      }
      return r;
    }
  }
  public static class ObservableExtensions {
    public static IDisposable WeakSubscribe<T, U>(this IObservable<U> source, T target, Action<T, U> action)
      where T : class {
      var weakRef = new WeakReference<T>(target);
      var r = source.Subscribe(u => {
        var t = weakRef.TryGetTarget();
        if (t != null) {
          action(t, u);
        }
      });
      return r;
    }
  }
}

样本观测:

using System;
using System.Reactive.Subjects;
namespace Closures {
  public class Observable {
    public IObservable<int> ObservableProperty => _subject;
    private Subject<int> _subject = new Subject<int>();
    private int n;
    public void Fire() {
      _subject.OnNext(n++);
    }
  }
}

用法:

Class SomeClass {
 IDisposable disposable;
 public void SomeMethod(Observable observeMe) {
   disposable = observeMe.ObservableProperty.WeakSubscribe(this, (wo, n) => wo.Log(n));
 }
  public void Log(int n) {
    System.Diagnostics.Debug.WriteLine("log "+n);
  }
}

下面的代码的灵感来自于dtb的原始帖子。唯一的变化是它返回一个对观察者的引用,作为IDisposable的一部分。这意味着对IObserver的引用将保持活动状态,只要您保持对链末尾取出的IDisposable的引用(假设所有disposable都保持对它们之前的disposable的引用)。这允许使用扩展方法,如Subscribe(M=>DoSomethingWithM(M)),因为我们保留了对隐式构造的IObserver的引用,但我们没有保留从源到IObserver的强引用(这会产生内存溢出)。

using System.Reactive.Linq;
static class WeakObservation
{
    public static IObservable<T> ToWeakObservable<T>(this IObservable<T> observable)
    {
        return Observable.Create<T>(observer =>
            (IDisposable)new DisposableReference(new WeakObserver<T>(observable, observer), observer)
            );
    }
}
class DisposableReference : IDisposable
{
    public DisposableReference(IDisposable InnerDisposable, object Reference)
    {
        this.InnerDisposable = InnerDisposable;
        this.Reference = Reference;
    }
    private IDisposable InnerDisposable;
    private object Reference;
    public void Dispose()
    {
        InnerDisposable.Dispose();
        Reference = null;
    }
}
class WeakObserver<T> : IObserver<T>, IDisposable
{
    private readonly WeakReference reference;
    private readonly IDisposable subscription;
    private bool disposed;
    public WeakObserver(IObservable<T> observable, IObserver<T> observer)
    {
        this.reference = new WeakReference(observer);
        this.subscription = observable.Subscribe(this);
    }
    public void OnCompleted()
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnCompleted();
        else this.Dispose();
    }
    public void OnError(Exception error)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnError(error);
        else this.Dispose();
    }
    public void OnNext(T value)
    {
        var observer = (IObserver<T>)this.reference.Target;
        if (observer != null) observer.OnNext(value);
        else this.Dispose();
    }
    public void Dispose()
    {
        if (!this.disposed)
        {
            this.disposed = true;
            this.subscription.Dispose();
        }
    }
}

最新更新