使用RxJava创建响应队列或数据结构



我需要创建一个队列,当一个项目被添加到队列中时,订阅者应该得到通知。我想到的唯一解决方案是使用while循环来轮询队列或使用peek队列比较是否将新对象添加到队列中,如果发现新项目,则使用onNext()将项目转发给订阅者。这将是一个无限循环,这就是我需要观察队列的

您可以将其中一个队列子类化为add/offer/等…方法调用您的Observable

public class SubscriberQueue<E> extends LinkedList<E> {
   private Observable<E> mObservable = PublishSubscriber.create();
   @Override
   public boolean add(E e) {
      if (super.add()) {
         mObservable.onNext(e);
      }
   }
   public Observable<E> getObservable() {return mObservable;}
}
// Test code
SubscriberQueue<Integer> myQueue = new SubscriberQueue<>();
myQueue.getObservable().subscribe(i -> System.out.println("Added: " + i));
myQueue.add(1);