假设我们有3个可观察性,A、B和C。我需要同时运行所有3个(对于外行来说,异步(,但是:
- 如果我从A得到任何东西,请发射它…不要发射任何其他东西
- 如果A完成而不发出任何内容,请将规则1应用于B
- 如果B完成而不发射任何东西,则从C发射项目
- 如果C完成而不发出任何内容,则发出默认项
昨天我花了几个小时试图弄清楚这一点,RxJava中似乎还没有任何操作组合可以让我做到这一点。
你可以想到从左到右级联的值:
A-->B-->C
此外,级联被阻止,而每个级联都运行async并缓存其值。
A(无(-->B(无(-->C(无(->默认项
为了清楚起见,A必须在任何其他观察者发出任何东西之前完成。如果A、B、C不能发射任何东西,那么B和C的逻辑是相同的,然后是默认的。
显然,这涉及到缓存,我绝对不想回放可观察到的内容。我需要重播缓存的值。每个门上都挂着。
该行为与concat((非常相似,只是如果链的下一部分之前有排放,则不会释放。
以下是我的想法:
**
* Works like {@link rx.Observable#concat} but concatenated Observables
* are all run immediately on their given {@link rx.Scheduler}.
*
* This Observable is blocking in the sense that items are emitted in order
* like {@link rx.Observable#concat} but since each Observable is run on
* an (possibly) asynchronous scheduler, items emitted further down the chain
* of Observables are held until items further up the chain are (possibly) emitted.
*
* This Observable also short-circuits and does not emit items further down
* the chain of Observables when an Observable higher up the chain emits items.
*
* For example:
*
* Given Observable A, B, and C
*
* If A emits item(s) emit them... do not emit anything else.
* If A completes without emitting anything, apply previous rule to B.
* If B completes without emitting anything, emit items from C (if any)
*
* @param <T>
*/
public class ConcatObservable<T> {
private final List<Observable<? extends T>> observables;
private ConcatObservable(List<Observable<? extends T>> observables) {
this.observables = observables;
}
public static <T> ConcatObservable<T> from(Observable<? extends T>... observables) {
return new ConcatObservable<T>(Arrays.asList(observables));
}
public Observable<T> asObservable() {
final List<Subscription> subscriptions = new CopyOnWriteArrayList<Subscription>();
return Observable.create(new Observable.OnSubscribe<T>() {
@Override public void call(final Subscriber<? super T> subscriber) {
List<Observable<? extends T>> cachedObservables = new ArrayList<Observable<? extends T>>();
for (Observable<? extends T> observable : observables) {
// tell it to cache values
final ReplaySubject<T> subject = ReplaySubject.create();
cachedObservables.add(subject);
// run it with nobody listening
Subscription subscription = observable.subscribe(new Observer<T>() {
@Override public void onCompleted() {
subject.onCompleted();
}
@Override public void onError(Throwable e) {
subject.onError(e);
}
@Override public void onNext(T item) {
subject.onNext(item);
}
});
subscriptions.add(subscription);
}
final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
// for the cached ones, already running
for (Observable<? extends T> observable : cachedObservables) {
final AtomicBoolean shouldExit = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
Subscription subscription = observable.subscribe(new Observer<T>() {
@Override public void onCompleted() {
latch.countDown();
}
@Override public void onError(Throwable e) {
error.set(e);
shouldExit.set(true);
latch.countDown();
}
@Override public void onNext(T item) {
subscriber.onNext(item);
shouldExit.set(true);
}
});
// Track each subscription
subscriptions.add(subscription);
try {
// Wait for this one to stop emitting, or error
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("Interrupted while waiting for subscription to complete", e);
}
// This one had an item(s), so we don't bother with the rest
if (shouldExit.get()) {
break;
}
}
// Release inner subscriptions
for (Subscription subscription : subscriptions) {
subscription.unsubscribe();
}
// Obey the Observable contract...
Throwable throwable = error.get();
if (throwable != null) {
subscriber.onError(throwable);
} else {
subscriber.onCompleted();
}
}
});
}
}
以下是相应的测试:
public class ConcatObservableTest {
@Test @SuppressWarnings("unchecked")
public void it_onlyEmitsFromFirstObservable() {
Observable<String> A = Observable.from(Arrays.asList("A", "A", "A"));
Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));
Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
observable.subscribe(testSubscriber);
assertThat(testSubscriber.getOnNextEvents()).containsExactly("A", "A", "A");
}
@Test @SuppressWarnings("unchecked")
public void it_onlyEmitsFromSecondObservable() {
Observable<String> A = Observable.empty();
Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));
Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
observable.subscribe(testSubscriber);
assertThat(testSubscriber.getOnNextEvents()).containsExactly("B", "B", "B");
}
@Test @SuppressWarnings("unchecked")
public void it_onlyEmitsFromLastObservable() {
Observable<String> A = Observable.empty();
Observable<String> B = Observable.empty();
Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));
Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
observable.subscribe(testSubscriber);
assertThat(testSubscriber.getOnNextEvents()).containsExactly("C", "C", "C");
}
@Test @SuppressWarnings("unchecked")
public void it_shouldStartAllObservables() {
TestObservable<String> letters = TestObservable.createTestObservable("A", "B", "C");
TestObservable<String> numbers = TestObservable.createDelayedTestObservable(100, "1", "2", "3");
TestObservable<String> animals = TestObservable.createDelayedTestObservable(200, "zebra", "donkey", "unicorn");
Observable<String> observable = ConcatObservable.from(letters, numbers, animals).asObservable();
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
observable.subscribe(testSubscriber);
assertThat(letters.isCalled()).isTrue();
assertThat(numbers.isCalled()).isTrue();
assertThat(animals.isCalled()).isTrue();
}
static class TestObservable<T> extends Observable<T> {
private final TestOnSubscribe<T> onSubscribeFunc;
private TestObservable(TestOnSubscribe<T> f) {
super(f);
onSubscribeFunc = f;
}
public boolean isCalled() {
return onSubscribeFunc.isCalled();
}
@SuppressWarnings("unchecked")
public static <T> TestObservable<T> createTestObservable(final T... items) {
return createDelayedTestObservable(0, items);
}
@SuppressWarnings("unchecked")
public static <T> TestObservable<T> createDelayedTestObservable(final long delay, final T... items) {
return new TestObservable<T>(new TestOnSubscribe<T>(delay, items));
}
private static class TestOnSubscribe<T> implements OnSubscribe<T> {
private final long delay;
private final T[] items;
private boolean isCalled;
private TestOnSubscribe(long delay, T... items) {
this.delay = delay;
this.items = items;
}
@Override public void call(Subscriber<? super T> subscriber) {
isCalled = true;
for (T item : items) {
if (delay > 0) {
sleep(delay);
}
subscriber.onNext(item);
}
subscriber.onCompleted();
}
public boolean isCalled() {
return isCalled;
}
private void sleep(long time) {
try {
Thread.sleep(time);
} catch (InterruptedException e) { }
}
}
}
}
public class ConcatObservable<T> {
private final List<Observable<? extends T>> observables;
private ConcatObservable(List<Observable<? extends T>> observables) {
this.observables = observables;
}
public static <T> ConcatObservable<T> from(Observable<? extends T>... observables) {
return new ConcatObservable<T>(Arrays.asList(observables));
}
public Observable<T> asObservable() {
return Observable.create(new Observable.OnSubscribe<T>() {
@Override
public void call(final Subscriber<? super T> subscriber) {
List<Observable<? extends T>> cachedObservables = new ArrayList<Observable<? extends T>>();
for (Observable<? extends T> observable : observables) {
ConnectableObservable<? extends T> replayedObservable = observable.replay();
cachedObservables.add(replayedObservable);
subscriber.add(replayedObservable.connect());
}
Subscription s = Observable.concat(Observable.from(cachedObservables)).take(1).subscribe(subscriber);
subscriber.add(s);
}
});
}
}
编辑
这很接近,但未通过以下测试:
@Test @SuppressWarnings("unchecked")
public void it_onlyEmitsFromFirstObservable() {
Observable<String> A = Observable.from(Arrays.asList("A", "A", "A"));
Observable<String> B = Observable.from(Arrays.asList("B", "B", "B"));
Observable<String> C = Observable.from(Arrays.asList("C", "C", "C"));
Observable<String> observable = ConcatObservable.from(A, B, C).asObservable();
TestSubscriber<String> testSubscriber = new TestSubscriber<String>();
observable.subscribe(testSubscriber);
assertThat(testSubscriber.getOnNextEvents()).containsExactly("A", "A", "A");
}