跟踪可观察量中的观察者(数量)



我有一个可观察值,它代表股票价格流。如果我的可观察序列上没有观察者,我希望能够断开与提供价格流的远程服务器的连接,但我不想这样做,直到每个观察者都调用了 Dispose()。然后以类似的方式,当第一个人调用订阅时,我想重新连接到远程服务器。

有没有办法弄清楚有多少观察者在可观察量上调用了订阅?或者也许是一种知道观察者何时调用订阅或处置的方法?

我会简单地使用RefCount/Publish。我总是觉得如果我在实现IObservable,我工作得太辛苦了。

myColdObservable.Publish().RefCount();

这将使您的可观察对象在每个人都断开连接后停止脉冲。 下面是一个示例:

var coldObservable = Observable
    .Interval(TimeSpan.FromSeconds(1))
    .ObserveOn(Scheduler.TaskPool)
    .Select(_ => DoSomething());
var refCountObs = coldObservable.Publish().RefCount();
CompositeDisposable d = new CompositeDisposable();
d.Add(refCountObs.Subscribe(n => Console.WriteLine("First got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Second got: " + n)));
d.Add(refCountObs.Subscribe(n => Console.WriteLine("Third got: " + n)));
//Wait a bit for work to happen
System.Threading.Thread.Sleep(10000);
//Everyone unsubscribes
d.Dispose();
//Observe that DoSomething is not called.
System.Threading.Thread.Sleep(3000);

不包括您实际上想知道订阅者数量的情况,但我认为这符合您在没有订阅者时停止工作的要求。

有点旧,但我遇到了这篇文章,因为我遇到了一个问题,我需要知道订阅者的数量。根据巴特的建议,我想出了这个扩展。

public static IObservable<T> CountSubscribers<T>(this IObservable<T> source, Action<int> countChanged)
{
 int count = 0;
 return Observable.Defer(() =>
 {
    count = Interlocked.Increment(ref count);
    countChanged(count);
    return source.Finally(() =>
     {
        count = Interlocked.Decrement(ref count);
        countChanged(count);
     });
 });
}

一般来说,不要实现 IObservable;通常 Rx 中已经有 soemthing 可以直接或通过组合来帮助你。如果您必须实现IObservable,请使用Observable.Create来实现,以便获得观察者合约等所需的所有保证。

至于你的问题 - 使用Publish和RefCount的建议正是你正在寻找的构图。如果出于某种原因想要计算自己,请使用 Observable.Defer 拦截订阅,可能使用 Observable.Last 来拦截序列终止。或者,使用 Observable.Create 包装源,将观察者转发到包装的序列,并使用计数逻辑包装返回的 IDisposable(使用 Disposable.Create)。

干杯

-巴特(Rx团队)

IObservable<T>是一个

可以实现的接口。在接口的订阅方法中,您可以通过在内部维护列表来跟踪观察者。

以下代码片段来自 MSDN。

private List<IObserver<Location>> observers;
public IDisposable Subscribe(IObserver<Location> observer) 
{
   if (! observers.Contains(observer)) 
      observers.Add(observer);
   // ------- If observers.Count == 1 create connection. -------
   return new Unsubscriber(observers, observer);
}
private class Unsubscriber : IDisposable
{
   private List<IObserver<Location>>_observers;
   private IObserver<Location> _observer;
   public Unsubscriber(List<IObserver<Location>> observers, IObserver<Location> observer)
   {
      this._observers = observers;
      this._observer = observer;
   }
   public void Dispose()
   {
      if (_observer != null && _observers.Contains(_observer))
         _observers.Remove(_observer);
      // ----------- if observers.Count == 0 close connection -----------
   }
}

相关内容

  • 没有找到相关文章

最新更新