如何管理无限可观察量的生命周期,当它是由订阅者而不是源决定的



我正在使用IObservable推送数据更新/更改,我有一个从数据库GetLatestElement获取最新数据的方法,每当有人调用UpdateElement并且数据得到更新时,消息都会通过消息传递系统分发。

因此,我正在创建一个发出最新值的可观察量,然后在它从消息传递系统收到更新事件时重新发出新值:

public IObservable<IElement> GetElement(Guid id)
{
return Observable.Create<T>((observer) =>
{
observer.OnNext(GetLatestElement(id));
// subscribe to internal or external update notifications
var messageCallback = (message) =>
{
// new update message recieved,
observer.OnNext(GetLatestElement(id));
}
messageService.SubscribeToTopic(id, messageCallback);
return Disposable.Create(() => Console.Writeline("Observer Disposed"));
});
}

我的问题是这是不确定的。这些更新可能会永远发生。由于我试图使系统尽可能无状态,因此为每个GetElementType请求创建了一个新的可观察量。这意味着生存期由订阅者决定,而不是数据源。

我永远不会在可观察量中调用OnComplete(),我想在观察者/用户完成后完成。

但是,我需要在某个时间点调用messageService.Unsubscribe(messageCallback);,以便在 Observable 完成后取消订阅消息。

我可以在释放订阅时执行此操作,但随后我只能订阅一次,这似乎可能会引入错误。

应该如何使用可观察量完成此操作?

似乎对Observable.Create的工作方式存在一些误解。每当您根据GetElement()的结果调用Subscribe时,都会执行Observable.Create的主体。因此,对于每个订阅者,您都有单独的messageService订阅,并执行单独的回调。如果您取消订阅 - 则只会删除订阅者的订阅。所有其他保持活动状态,因为它们有自己的messageCallback。这当然是假设messageService得到正确实施。下面是示例应用程序,说明:

static IElement  GetLatestElement(Guid id) {
return new Element();
}
public class Element : IElement {
}
public interface IElement {
}
class MessageService {
private Dictionary<Guid, Dictionary<Action<IElement>, CancellationTokenSource>> _subs = new Dictionary<Guid, Dictionary<Action<IElement>, CancellationTokenSource>>();
public void SubscribeToTopic(Guid id, Action<IElement> callback) {
var ct = new CancellationTokenSource();
if (!_subs.ContainsKey(id))
_subs[id] = new Dictionary<Action<IElement>, CancellationTokenSource>();
_subs[id].Add(callback, ct);
Task.Run(() =>
{
while (!ct.IsCancellationRequested) {
callback(new Element());
Thread.Sleep(500);
}
});
}
public void Unsubscribe(Guid id, Action<IElement> callback) {
_subs[id][callback].Cancel();
_subs[id].Remove(callback);
}
}
public static IObservable<IElement> GetElement(Guid id)
{
var messageService = new MessageService();
return Observable.Create<IElement>((observer) =>
{
observer.OnNext(GetLatestElement(id));
// subscribe to internal or external update notifications
Action<IElement> messageCallback = (message) =>
{
// new update message recieved,
observer.OnNext(GetLatestElement(id));
};
messageService.SubscribeToTopic(id, messageCallback);
return Disposable.Create(() => {
messageService.Unsubscribe(id, messageCallback);
Console.WriteLine("Observer Disposed");
});
});
}
public static void Main(string[] args) {
var ob = GetElement(Guid.NewGuid());
var sub1 = ob.Subscribe(c =>
{
Console.WriteLine("got element");
});
var sub2 = ob.Subscribe(c =>
{
Console.WriteLine("got element 2");
});
// at this point we see both subscribers receive messages
Console.ReadKey();
sub1.Dispose();
// first one is unsubscribed, but second one is still alive
Console.ReadKey();
}

所以正如我所说,评论 - 我认为在这种情况下没有理由完成你的可观察性。

正如 Evk 指出的那样,Observable.Create运行几乎立即处理。如果您想保持messageService订阅打开状态,Rx 可以帮助您。看看MessageObservableProvider.剩下的只是让东西编译:

public class MessageObservableProvider
{
private MessageService messageService;
private Dictionary<Guid, IObservable<Unit>> _messageNotifications = new Dictionary<Guid, IObservable<Unit>>();
private IObservable<Unit> GetMessageNotifications(Guid id)
{
return Observable.Create<Unit>((observer) =>
{
Action<Message> messageCallback = _ => observer.OnNext(Unit.Default);
messageService.SubscribeToTopic(id, messageCallback);
return Disposable.Create(() =>
{
messageService.Unsubscribe(messageCallback);
Console.WriteLine("Observer Disposed");
});
});
}
public IObservable<IElement> GetElement(Guid id)
{
if(!_messageNotifications.ContainsKey(id))
_messageNotifications[id] = GetMessageNotifications(id).Publish().RefCount();
return _messageNotifications[id]
.Select(_ => GetLatestElement(id))
.StartWith(GetLatestElement(id));
}
private IElement GetLatestElement(Guid id)
{
throw new NotImplementedException();
}
}
public class IElement { }
public class Message { }
public class MessageService
{
public void SubscribeToTopic(Guid id, Action<Message> callback)
{
throw new NotImplementedException();
}
public void Unsubscribe(Action<Message> callback)
{
throw new NotImplementedException();
}
}

您的原始Create实现包含StartWithSelect的功能。我把它们移了出来,所以现在Observable.Create只是在新消息可用时返回通知。

更重要的是,在GetElement现在有一个.Publish().RefCount()电话。这将使messageService订阅保持打开状态(通过不调用.Dispose()),只要至少有一个子项可观察(订阅)在周围。

最新更新