像 RxJava 中的 Subject 一样排队



我正在寻找一个主题(或类似的东西),可以:

  1. 如果没有订阅者,可以接收项目并将其保存在队列或缓冲区中
  2. 一旦我们有了订阅者,所有项目都会被消耗,再也不会发出
  3. 我可以订阅/取消订阅主题

BehaviorSubject几乎可以完成这项工作,但它保留了最后观察到的项目。

更新

根据接受的答案,我为单个观察到的项目制定了类似的解决方案。还添加了退订部分以避免内存泄漏。

class LastEventObservable private constructor(
        private val onSubscribe: OnSubscribe<Any>,
        private val state: State
) : Observable<Any>(onSubscribe) {
    fun emit(value: Any) {
        if (state.subscriber.hasObservers()) {
            state.subscriber.onNext(value)
        } else {
            state.lastItem = value
        }
    }
    companion object {
        fun create(): LastEventObservable {
            val state = State()
            val onSubscribe = OnSubscribe<Any> { subscriber ->
                just(state.lastItem)
                        .filter { it != null }
                        .doOnNext { subscriber.onNext(it) }
                        .doOnCompleted { state.lastItem = null }
                        .subscribe()
                val subscription = state.subscriber.subscribe(subscriber)
                subscriber.add(Subscriptions.create { subscription.unsubscribe() })
            }
            return LastEventObservable(onSubscribe, state)
        }
    }
    private class State {
        var lastItem: Any? = null
        val subscriber = PublishSubject.create<Any>()
    }
}

我创建了一个自定义的可观察量,该可观察量包装发布主题并在没有附加订阅者的情况下处理排放缓存。看看吧。

public class ExampleUnitTest {
    @Test
    public void testSample() throws Exception {
        MyCustomObservable myCustomObservable = new MyCustomObservable();
        myCustomObservable.emit("1");
        myCustomObservable.emit("2");
        myCustomObservable.emit("3");
        Subscription subscription = myCustomObservable.subscribe(System.out::println);
        myCustomObservable.emit("4");
        myCustomObservable.emit("5");
        subscription.unsubscribe();
        myCustomObservable.emit("6");
        myCustomObservable.emit("7");
        myCustomObservable.emit("8");
        myCustomObservable.subscribe(System.out::println);
    }
}
class MyCustomObservable extends Observable<String> {
    private static PublishSubject<String> publishSubject = PublishSubject.create();
    private static List<String> valuesCache = new ArrayList<>();
    protected MyCustomObservable() {
        super(subscriber -> {
            Observable.from(valuesCache)
                    .doOnNext(subscriber::onNext)
                    .doOnCompleted(valuesCache::clear)
                    .subscribe();
            publishSubject.subscribe(subscriber);
        });
    }
    public void emit(String value) {
        if (publishSubject.hasObservers()) {
            publishSubject.onNext(value);
        } else {
            valuesCache.add(value);
        }
    }
}

希望对您有所帮助!

此致敬意。

如果您只想等待单个订阅者,请使用UnicastSubject但请注意,如果您在中间取消订阅,则所有后续排队的项目都将丢失。

编辑:

一旦我们有了订阅者,所有项目都会被消耗,再也不会发出

对于多个订阅者,请使用 ReplaySubject

我有类似的问题,我的要求是:

  • 应支持在没有观察者订阅时重播值
  • 一次只允许订阅一个观察者
  • 应允许另一个观察者在处置第一个观察者时订阅

我已经将其实现为 RxRelay,但主题的实现将是类似的:

public final class CacheRelay<T> extends Relay<T> {
    private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
    private final PublishRelay<T> relay = PublishRelay.create();
    private CacheRelay() {
    }
    public static <T> CacheRelay<T> create() {
        return new CacheRelay<>();
    }
    @Override
    public void accept(T value) {
        if (relay.hasObservers()) {
            relay.accept(value);
        } else {
            queue.add(value);
        }
    }
    @Override
    public boolean hasObservers() {
        return relay.hasObservers();
    }
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (hasObservers()) {
            EmptyDisposable.error(new IllegalStateException("Only a single observer at a time allowed."), observer);
        } else {
            for (T element; (element = queue.poll()) != null; ) {
                observer.onNext(element);
            }
            relay.subscribeActual(observer);
        }
    }
}

查看此要点了解更多信息

最新更新