我有一个用RxJava建模的相互依赖的异步操作图。对于一些错误,整个图应该重新运行。的retry(..)
运营商不直接支持这一点,因为任何错误都会显示给所有订阅者。由于retry(..)
操作符只是重新订阅,它们总是从最终的可观察对象中获得只计算一次的误差。即,在重新订阅时不再执行该工作。
我尝试创建一个特殊的可观察对象来调用可观察对象生成方法,用于每个订阅。在这种情况下,重试操作符确实有效大多数情况下,在一些额外的缓存操作之后,工作准确。
然而,这似乎是如此普遍,我怀疑我在重复工作已经在RxJava中提供了。我也很关心我的解决方案的鲁棒性考虑到我正在尝试做一些低可能没有足够的RxJava知识来做到这一点。另一个问题是是可组合性:支持所有三个retry(..)
形式,我需要三个版本的包装器方法。
下面的演示说明了我正在尝试做的事情以及到目前为止所取得的成功。
在RxJava中是否有更简单或更习惯的方法(或两者都有)来做这种重试?package demo;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Subscriber;
import rx.functions.Func0;
import rx.util.async.Async;
/**
** <p>
* Demonstrate attempts to get RxJava retry for asynchronous work chain. The use
* case that exposed this problem is reading and writing data with versioning
* for optimistic concurrency. The work is a series of async I/O operations that
* must be re-assembled from scratch if a stale version is detected on write.
* </p>
*
* <p>
* Four cases are demonstrated in this class:
* </p>
* <ul>
* <li>Case 1: perform the work and naiively apply a retry operator to the
* asynchronous work. This fails because the work itself is not retried on
* re-subscribe.</li>
* <li>Case 2: wrap the work in an observer that performs it on every
* subscription. A retry operator applied to the wrapper correctly re-attempts
* the work on failure. However, every subsequent subscriber to the result
* causes the work to be performed again.</li>
* <li>Case 3: Apply the cache operator to the result of the retry operator.
* This performs as desired.</li>
* <li>Case 4: Generalize the approach of case 3 and encapsulate it in an
* observable generator method. This shows that it is difficult to generalize
* this behavior because each retry operator form (number, predicate, perpetual)
* will require its own generator method.</li>
* </ul>
*
* <p>
* NOTE: this code does not work if compiled by the Eclipse (Keppler) compiler
* for Java 8. I have to compile with javac for it to work. There is some
* problem with Lambda class naming in the code generated by Eclipse.
* </p>
*
*
*/
public class AsyncRetryDemo {
public static void main(final String[] args) throws Exception {
new AsyncRetryDemo().case1();
new AsyncRetryDemo().case2();
new AsyncRetryDemo().case3();
new AsyncRetryDemo().case4();
// output is:
//
// case 1, sub 1: fail (max retries, called=1)
// case 1, sub 2: fail (max retries, called=1)
// case 2, sub 1: pass (called=2)
// case 2, sub 2: fail (called=3)
// case 3, sub 1: pass (called=2)
// case 3, sub 2: pass (called=2)
// case 4, sub 1: pass (called=2)
// case 4, sub 2: pass (called=2)
}
private final AtomicInteger called = new AtomicInteger();
private final CountDownLatch done = new CountDownLatch(2);
/**
* This represents a sequence of interdependent asynchronous operations that
* might fail in a way that prescribes a retry (but in this case, all we are
* doing is squaring an integer asynchronously and failing the first time).
*
* @param input
* to the process.
*
* @return promise to perform the work and produce either a result or a
* suggestion to retry (e.g. a stale version error).
*/
private Observable<Integer> canBeRetried(final int a) {
final Observable<Integer> rval;
if (this.called.getAndIncrement() == 0) {
rval = Observable.error(new RuntimeException(
"we always fail the first time"));
} else {
rval = Async.start(() -> a * a);
}
return rval;
}
private void case1() throws InterruptedException {
/*
* In this case, we invoke the observable-creator to get the async
* promise. Of course, if it fails, any retry will fail as well because
* the failed result is computed one time and pushed to all subscribers
* forever.
*
* Thus this case fails because the first invocation of canBeRetried(..)
* always fails.
*/
final Observable<Integer> o = canBeRetried(2)
.retry(2);
check("case 1", o);
this.done.await();
}
private void case2() throws InterruptedException {
/*
* In this case, we wrap canBeRetried(..) inside an observer that
* invokes it on every subscription. So, we get past the retry problem.
* But every new subscriber after the retry succeeds causes the work to
* restart.
*/
final Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> child) {
canBeRetried(2).subscribe(child);
}
})
.retry(2);
check("case 2", o);
this.done.await();
}
private void case3() throws InterruptedException {
/*
* In this case, we wrap canBeRetried(..) inside an observer that
* invokes it on every subscription. So, we get past the retry problem.
* We cache the result of the retry to solve the extra work problem.
*/
final Observable<Integer> o = Observable.create(
new OnSubscribe<Integer>() {
@Override
public void call(final Subscriber<? super Integer> child) {
canBeRetried(2).subscribe(child);
}
})
.retry(2)
.cache();
check("case 3", o);
this.done.await();
}
private void case4() throws InterruptedException {
/*
* Same as case 3 but we use the retryAndCache(..) to do the work for
* us.
*/
final Observable<Integer> o = retryAndCache(() -> canBeRetried(2), 2);
check("case 4", o);
this.done.await();
}
private void check(final String label, final Observable<Integer> promise) {
// does the work get retried on failure?
promise.subscribe(
v -> {
System.out.println(label + ", sub 1: "
+ (this.called.get() == 2 ? "pass" : "fail")
+ " (called=" + this.called.get() + ")");
},
x -> {
System.out.println(label
+ ", sub 1: fail (max retries, called="
+ this.called.get() + ")");
this.done.countDown();
}, () -> {
this.done.countDown();
});
// do subsequent subscribers avoid invoking the work again?
promise.subscribe(
v -> {
System.out.println(label + ", sub 2: "
+ (this.called.get() == 2 ? "pass" : "fail")
+ " (called=" + this.called.get() + ")");
},
x -> {
System.out.println(label
+ ", sub 2: fail (max retries, called="
+ this.called.get() + ")");
this.done.countDown();
}, () -> {
this.done.countDown();
});
}
/**
* Generalized retry and cache for case 4.
*
* @param binder
* user-provided supplier that assembles and starts the
* asynchronous work.
*
* @param retries
* number of times to retry on error.
*
* @return promise to perform the work and retry up to retry times on error.
*/
private static <R> Observable<R> retryAndCache(
final Func0<Observable<R>> binder, final int retries) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(final Subscriber<? super R> child) {
binder.call().subscribe(child);
}
})
.retry(retries)
.cache();
}
}
实际上你有几个选项可以做得更好。
第一个选项是使用defer而不是create:
private void case5() throws InterruptedException {
// Same as case 3 but using defer
final Observable<Integer> o = Observable.defer(() -> canBeRetried(2)).retry(2).cache();
check("case 5", o);
this.done.await();
}
然而,真正的问题是与caneretired方法;它必须在每次重试时调用。更好的方法是创建一个Observable,为每个订阅重新执行逻辑。该方法可能看起来像:
private Observable<Integer> canBeRetriedBetter(final int a) {
return Observable.defer(() -> canBeRetried(a));
}
和代码:
private void case6() throws InterruptedException {
final Observable<Integer> o = canBeRetriedBetter(2).retry(2).cache();
check("case 6", o);
this.done.await();
}
可以使用组合和自定义转换获得进一步的改进。使用它们,您可以以一致的、可重用的方式将一组常用操作符应用于任何链。
例如,可以定义在流上调用缓存和重试操作符:
public static class RetryAndCache<T> implements Observable.Transformer<T, T>{
private final int count;
public RetryAndCache(int count) {
this.count = count;
}
@Override
public Observable<T> call(Observable<T> o) {
return o.retry(count).cache();
}
}
最后,代码:
private void case7() throws InterruptedException {
final Observable<Integer> o = canBeRetriedBetter(2).compose(new RetryAndCache(2));
check("case 7", o);
this.done.await();
}