我正在使用IObservable推送数据更新/更改,我有一个从数据库GetLatestElement
获取最新数据的方法,每当有人调用UpdateElement
并且数据得到更新时,消息都会通过消息传递系统分发。
因此,我正在创建一个发出最新值的可观察量,然后在它从消息传递系统收到更新事件时重新发出新值:
public IObservable<IElement> GetElement(Guid id)
{
return Observable.Create<T>((observer) =>
{
observer.OnNext(GetLatestElement(id));
// subscribe to internal or external update notifications
var messageCallback = (message) =>
{
// new update message recieved,
observer.OnNext(GetLatestElement(id));
}
messageService.SubscribeToTopic(id, messageCallback);
return Disposable.Create(() => Console.Writeline("Observer Disposed"));
});
}
我的问题是这是不确定的。这些更新可能会永远发生。由于我试图使系统尽可能无状态,因此为每个GetElementType
请求创建了一个新的可观察量。这意味着生存期由订阅者决定,而不是数据源。
我永远不会在可观察量中调用OnComplete()
,我想在观察者/用户完成后完成。
但是,我需要在某个时间点调用messageService.Unsubscribe(messageCallback);
,以便在 Observable 完成后取消订阅消息。
我可以在释放订阅时执行此操作,但随后我只能订阅一次,这似乎可能会引入错误。
应该如何使用可观察量完成此操作?
似乎对Observable.Create
的工作方式存在一些误解。每当您根据GetElement()
的结果调用Subscribe
时,都会执行Observable.Create
的主体。因此,对于每个订阅者,您都有单独的messageService
订阅,并执行单独的回调。如果您取消订阅 - 则只会删除该订阅者的订阅。所有其他保持活动状态,因为它们有自己的messageCallback
。这当然是假设messageService
得到正确实施。下面是示例应用程序,说明:
static IElement GetLatestElement(Guid id) {
return new Element();
}
public class Element : IElement {
}
public interface IElement {
}
class MessageService {
private Dictionary<Guid, Dictionary<Action<IElement>, CancellationTokenSource>> _subs = new Dictionary<Guid, Dictionary<Action<IElement>, CancellationTokenSource>>();
public void SubscribeToTopic(Guid id, Action<IElement> callback) {
var ct = new CancellationTokenSource();
if (!_subs.ContainsKey(id))
_subs[id] = new Dictionary<Action<IElement>, CancellationTokenSource>();
_subs[id].Add(callback, ct);
Task.Run(() =>
{
while (!ct.IsCancellationRequested) {
callback(new Element());
Thread.Sleep(500);
}
});
}
public void Unsubscribe(Guid id, Action<IElement> callback) {
_subs[id][callback].Cancel();
_subs[id].Remove(callback);
}
}
public static IObservable<IElement> GetElement(Guid id)
{
var messageService = new MessageService();
return Observable.Create<IElement>((observer) =>
{
observer.OnNext(GetLatestElement(id));
// subscribe to internal or external update notifications
Action<IElement> messageCallback = (message) =>
{
// new update message recieved,
observer.OnNext(GetLatestElement(id));
};
messageService.SubscribeToTopic(id, messageCallback);
return Disposable.Create(() => {
messageService.Unsubscribe(id, messageCallback);
Console.WriteLine("Observer Disposed");
});
});
}
public static void Main(string[] args) {
var ob = GetElement(Guid.NewGuid());
var sub1 = ob.Subscribe(c =>
{
Console.WriteLine("got element");
});
var sub2 = ob.Subscribe(c =>
{
Console.WriteLine("got element 2");
});
// at this point we see both subscribers receive messages
Console.ReadKey();
sub1.Dispose();
// first one is unsubscribed, but second one is still alive
Console.ReadKey();
}
所以正如我所说,评论 - 我认为在这种情况下没有理由完成你的可观察性。
正如 Evk 指出的那样,Observable.Create
运行几乎立即处理。如果您想保持messageService
订阅打开状态,Rx 可以帮助您。看看MessageObservableProvider
.剩下的只是让东西编译:
public class MessageObservableProvider
{
private MessageService messageService;
private Dictionary<Guid, IObservable<Unit>> _messageNotifications = new Dictionary<Guid, IObservable<Unit>>();
private IObservable<Unit> GetMessageNotifications(Guid id)
{
return Observable.Create<Unit>((observer) =>
{
Action<Message> messageCallback = _ => observer.OnNext(Unit.Default);
messageService.SubscribeToTopic(id, messageCallback);
return Disposable.Create(() =>
{
messageService.Unsubscribe(messageCallback);
Console.WriteLine("Observer Disposed");
});
});
}
public IObservable<IElement> GetElement(Guid id)
{
if(!_messageNotifications.ContainsKey(id))
_messageNotifications[id] = GetMessageNotifications(id).Publish().RefCount();
return _messageNotifications[id]
.Select(_ => GetLatestElement(id))
.StartWith(GetLatestElement(id));
}
private IElement GetLatestElement(Guid id)
{
throw new NotImplementedException();
}
}
public class IElement { }
public class Message { }
public class MessageService
{
public void SubscribeToTopic(Guid id, Action<Message> callback)
{
throw new NotImplementedException();
}
public void Unsubscribe(Action<Message> callback)
{
throw new NotImplementedException();
}
}
您的原始Create
实现包含StartWith
和Select
的功能。我把它们移了出来,所以现在Observable.Create
只是在新消息可用时返回通知。
更重要的是,在GetElement
现在有一个.Publish().RefCount()
电话。这将使messageService
订阅保持打开状态(通过不调用.Dispose()
),只要至少有一个子项可观察(订阅)在周围。