RxJava订阅阻塞可观察对象



我想实现以下目标:

String result = myObservable.toBlocking().first();

。它就像一个普通的函数调用。然而,这种情况永远不会发生,因为你需要订阅它,我不知道该怎么做。如果我订阅了它,结果将在另一个作用域中,代码非常难看,因为我只能像它是一个常规的可观察对象那样获得结果,所以没有必要把它变成一个阻塞的可观察对象。

它实际上是你想要的:

    Observable<String> myObservable = Observable.just("firstValue", "secondValue");
    String result = myObservable.toBlocking().first();
    System.out.println(result); // ---> "firstValue"

在底层,调用BlockingObservable.first()为您做订阅:

private T blockForSingle(final Observable<? extends T> observable) {
    final AtomicReference<T> returnItem = new AtomicReference<T>();
    final AtomicReference<Throwable> returnException = new AtomicReference<Throwable>();
    final CountDownLatch latch = new CountDownLatch(1);
    @SuppressWarnings("unchecked")
    Subscription subscription = ((Observable<T>)observable).subscribe(new Subscriber<T>() {
        @Override
        public void onCompleted() {
            latch.countDown();
        }
        @Override
        public void onError(final Throwable e) {
            returnException.set(e);
            latch.countDown();
        }
        @Override
        public void onNext(final T item) {
            returnItem.set(item);
        }
    });
    BlockingUtils.awaitForComplete(latch, subscription);
    if (returnException.get() != null) {
        Exceptions.propagate(returnException.get());
    }
    return returnItem.get();
}

UPDATE:如果使用BehaviourSubject + toBlocking()没有任何意义。考虑到它是ObservableObserver,所以在某个地方,myObservable.onNext("value")应该被调用。如果你通过调用toBlocking()来阻塞线程,除非myObservable在调用onNext()的其他线程中可用,否则你将被阻塞。

例如,这是一个"BehaviourSubject"的正常用法:

  // observer will receive the "one", "two" and "three" events, but not "zero"
  BehaviorSubject<Object> subject = BehaviorSubject.create("default");
  subject.onNext("zero");
  subject.onNext("one");
  subject.subscribe(observer);
  subject.onNext("two");
  subject.onNext("three");

相关内容

  • 没有找到相关文章

最新更新