异步、提前退出、级联的Observable



假设我们有3个可观察性,ABC。我需要同时运行所有3个(对于外行来说,异步(,但是:

  1. 如果我从A得到任何东西,请发射它…不要发射任何其他东西
  2. 如果A完成而不发出任何内容,请将规则1应用于B
  3. 如果B完成而不发射任何东西,则从C发射项目
  4. 如果C完成而不发出任何内容,则发出默认项

昨天我花了几个小时试图弄清楚这一点,RxJava中似乎还没有任何操作组合可以让我做到这一点。

你可以想到从左到右级联的值:

A-->B-->C

此外,级联被阻止,而每个级联都运行async并缓存其值。

A(无(-->B(无(-->C(无(->默认项

为了清楚起见,A必须在任何其他观察者发出任何东西之前完成。如果A、B、C不能发射任何东西,那么B和C的逻辑是相同的,然后是默认的。

显然,这涉及到缓存,我绝对不想回放可观察到的内容。我需要重播缓存的值。每个门上都挂着。

该行为与concat((非常相似,只是如果链的下一部分之前有排放,则不会释放。

以下是我的想法:

**
 * Works like {@link rx.Observable#concat} but concatenated Observables
 * are all run immediately on their given {@link rx.Scheduler}.
 *
 * This Observable is blocking in the sense that items are emitted in order
 * like {@link rx.Observable#concat} but since each Observable is run on
 * an (possibly) asynchronous scheduler, items emitted further down the chain
 * of Observables are held until items further up the chain are (possibly) emitted.
 *
 * This Observable also short-circuits and does not emit items further down
 * the chain of Observables when an Observable higher up the chain emits items.
 *
 * For example:
 *
 * Given Observable A, B, and C
 *
 * If A emits item(s) emit them... do not emit anything else.
 * If A completes without emitting anything, apply previous rule to B.
 * If B completes without emitting anything, emit items from C (if any)
 *
 * @param <T>
 */
public class ConcatObservable<T> {
  private final List<Observable<? extends T>> observables;
  private ConcatObservable(List<Observable<? extends T>> observables) {
    this.observables = observables;
  }
  public static <T> ConcatObservable<T> from(Observable<? extends T>... observables) {
    return new ConcatObservable<T>(Arrays.asList(observables));
  }
  public Observable<T> asObservable() {
    final List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();
    return Observable.create(new Observable.OnSubscribe<T>() {
      @Override public void call(final Subscriber<? super T> subscriber) {
        List<Observable<? extends T>> cachedObservables = new ArrayList<Observable<? extends T>>();
        for (Observable<? extends T> observable : observables) {
          // tell it to cache values
          final ReplaySubject<T> subject = ReplaySubject.create();
          cachedObservables.add(subject);
          // run it with nobody listening
          Subscription subscription = observable.subscribe(new Observer<T>() {
            @Override public void onCompleted() {
              subject.onCompleted();
            }
            @Override public void onError(Throwable e) {
              subject.onError(e);
            }
            @Override public void onNext(T item) {
              subject.onNext(item);
            }
          });
          subscriptions.add(subscription);
        }
        final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
        // for the cached ones, already running
        for (Observable<? extends T> observable : cachedObservables) {
          final AtomicBoolean shouldExit = new AtomicBoolean(false);
          final CountDownLatch latch = new CountDownLatch(1);
          Subscription subscription = observable.subscribe(new Observer<T>() {
            @Override public void onCompleted() {
              latch.countDown();
            }
            @Override public void onError(Throwable e) {
              error.set(e);
              shouldExit.set(true);
              latch.countDown();
            }
            @Override public void onNext(T item) {
              subscriber.onNext(item);
              shouldExit.set(true);
            }
          });
          // Track each subscription
          subscriptions.add(subscription);
          try {
            // Wait for this one to stop emitting, or error
            latch.await();
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("Interrupted while waiting for subscription to complete", e);
          }
          // This one had an item(s), so we don't bother with the rest
          if (shouldExit.get()) {
            break;
          }
        }
        // Release inner subscriptions
        for (Subscription subscription : subscriptions) {
          subscription.unsubscribe();
        }
        // Obey the Observable contract...
        Throwable throwable = error.get();
        if (throwable != null) {
          subscriber.onError(throwable);
        } else {
          subscriber.onCompleted();
        }
      }
    });
  }
}

以下是相应的测试:

public class ConcatObservableTest {
  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromFirstObservable() {
    Observable<String> A = Observable.from(Arrays.asList("A", "A", "A"));
    Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));
    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();
    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);
    assertThat(testSubscriber.getOnNextEvents()).containsExactly("A", "A", "A");
  }
  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromSecondObservable() {
    Observable<String> A = Observable.empty();
    Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));
    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();
    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);
    assertThat(testSubscriber.getOnNextEvents()).containsExactly("B", "B", "B");
  }
  @Test @SuppressWarnings("unchecked")
  public void it_onlyEmitsFromLastObservable() {
    Observable<String> A = Observable.empty();
    Observable<String> B = Observable.empty();
    Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));
    Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();
    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);
    assertThat(testSubscriber.getOnNextEvents()).containsExactly("C", "C", "C");
  }
  @Test @SuppressWarnings("unchecked")
  public void it_shouldStartAllObservables() {
    TestObservable<String> letters = TestObservable.createTestObservable("A", "B", "C");
    TestObservable<String> numbers = TestObservable.createDelayedTestObservable(100, "1", "2", "3");
    TestObservable<String> animals = TestObservable.createDelayedTestObservable(200, "zebra", "donkey", "unicorn");
    Observable<String> observable = ConcatObservable.from(letters, numbers, animals).asObservable();
    TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
    observable.subscribe(testSubscriber);
    assertThat(letters.isCalled()).isTrue();
    assertThat(numbers.isCalled()).isTrue();
    assertThat(animals.isCalled()).isTrue();
  }
  static class TestObservable<T> extends Observable<T> {
    private final TestOnSubscribe<T> onSubscribeFunc;
    private TestObservable(TestOnSubscribe<T> f) {
      super(f);
      onSubscribeFunc = f;
    }
    public boolean isCalled() {
      return onSubscribeFunc.isCalled();
    }
    @SuppressWarnings("unchecked")
    public static <T> TestObservable<T> createTestObservable(final T... items) {
      return createDelayedTestObservable(0, items);
    }
    @SuppressWarnings("unchecked")
    public static <T> TestObservable<T> createDelayedTestObservable(final long delay, final T... items) {
      return new TestObservable<T>(new TestOnSubscribe<T>(delay, items));
    }
    private static class TestOnSubscribe<T> implements OnSubscribe<T> {
      private final long delay;
      private final T[] items;
      private boolean isCalled;
      private TestOnSubscribe(long delay, T... items) {
        this.delay = delay;
        this.items = items;
      }
      @Override public void call(Subscriber<? super T> subscriber) {
        isCalled = true;
        for (T item : items) {
          if (delay > 0) {
            sleep(delay);
          }
          subscriber.onNext(item);
        }
        subscriber.onCompleted();
      }
      public boolean isCalled() {
        return isCalled;
      }
      private void sleep(long time) {
        try {
          Thread.sleep(time);
        } catch (InterruptedException e) { }
      }
    }
  }
}
public class ConcatObservable<T> {
private final List<Observable<? extends T>> observables;
private ConcatObservable(List<Observable<? extends T>> observables) {
    this.observables = observables;
}
public static <T> ConcatObservable<T> from(Observable<? extends T>... observables) {
    return new ConcatObservable<T>(Arrays.asList(observables));
}
public Observable<T> asObservable() {
    return Observable.create(new Observable.OnSubscribe<T>() {
        @Override
        public void call(final Subscriber<? super T> subscriber) {
            List<Observable<? extends T>> cachedObservables = new ArrayList<Observable<? extends T>>();
            for (Observable<? extends T> observable : observables) {
                ConnectableObservable<? extends T> replayedObservable = observable.replay();
                cachedObservables.add(replayedObservable);
                subscriber.add(replayedObservable.connect());
            }
            Subscription s = Observable.concat(Observable.from(cachedObservables)).take(1).subscribe(subscriber);
            subscriber.add(s);
        }
    });
}
}

编辑

这很接近,但未通过以下测试:

@Test @SuppressWarnings("unchecked")
public void it_onlyEmitsFromFirstObservable() {
  Observable<String> A = Observable.from(Arrays.asList("A", "A", "A"));
  Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
  Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));
  Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();
  TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
  observable.subscribe(testSubscriber);
  assertThat(testSubscriber.getOnNextEvents()).containsExactly("A", "A", "A");
}

相关内容

  • 没有找到相关文章

最新更新