我有一个SignalR集线器,它监听客户端请求,并使用Rx.NET观察数据库表,以便在更新可用时将更新发回请求更新的客户端。但是,根据客户端请求,在集线器中创建的Observer
实例似乎在方法调用完成后立即被销毁(由GC?);因此,我没有得到任何更新。
这是我当前的Hub
实现:
public class BookHub : Hub {
private readonly BookService _service = new BookService();
public void RequestBookUpdate(string author) {
BookObserver observer = new BookObserver(Context.connectionId);
IDisposable unsubscriber = _service.RequestBookUpdate(author, observer);
}
}
BookService
返回转换为Observable
:的LINQ查询
public IDisposable RequestBookUpdate(string author, BookObserver observer) {
var query = from b in db.Book where b.Author.Contains(author) select b;
IObservable<Book> observable = query.ToObservable();
IDisposable unsubscriber = observable.Subscribe(observer);
return unsubscriber;
}
BookObserver
只是将新检索到的项目发送回请求更新的特定客户端(由connectionId
标识):
// omissis
private static readonly IHubContext _context = GlobalHost.ConnectionManager.GetHubContext<BookHub>();
private readonly string _connectionId;
public BookObserver(string connectionId) {
connectionId = _connectionId:
}
public void OnNext(Book value) {
_context.Clients.Client(_connectionId).foundNewBook(value);
}
我不关心BookService
实例被破坏,但我希望BookObserver
保持活动状态,因此只有当客户端断开连接时,我才能调用unsubscriber.Dispose()
。这可能吗?
当Observer
接收到对OnComplete
的调用时,它将被自动释放。这实际上是一个非常好的模式,因为这意味着你不必像这样手动处理Subscription
:
Observable.Range(0, 100)
.Subscribe(...);
或
Observable.Interval(TimeSpan.FromSeconds(1))
.Take(10)
.Subscribe();
因此,为了确保你的观察者在想要被处理之前不会被处理,你可以插入另一个对你的源来说是空的、永远不完整的、可观察的。
IObservable<Book> observable = query.ToObservable()
.Concat(Observable.Never<Book>());
然而,根据你试图做的事情,最好在其他地方处理,比如客户。