我正在研究一些使用。net (Rx)响应式扩展的东西,我想有一个序列,从队列(或类似的)中获取输入。
我已经试过了:
static readonly Queue<DeviceTransaction> TransactionQueue = new Queue<DeviceTransaction>();
//...
var observableTransactionSource = TransactionQueue.ToObservable();
//...
observableTransactionSource.Subscribe(transactionObserver);
它工作到一定程度,但是当队列为空时序列完成。我不想让空队列结束序列。空并不意味着结束,它只意味着"现在没有了"。
是否有一种方法可以在队列为空时停止序列完成,或者我应该以不同的方式思考整个问题?
调用ToObservable()
充满了问题,正如我在这里解释的那样,它只是要使用IEnumerable<T>
并消耗队列的快照。
在这种情况下,使用Subject<T>
来支持事件可能要好得多。由于Rx语法指定您必须序列化事件交付,因此已经具有排队语义。只需在主题上调用OnNext<T>
来发布事件。
如果您需要确保在事件发布后发生的订阅不会错过事件,请使用ReplaySubject<T>
。
如果使用主题与您有关,那么您可能需要回顾这篇博客文章。总而言之,您对队列的使用表明在这里使用主题是可以的,但您可能要考虑是否可以使用Observable.FromEvent
之类的转换方法。