RxJava -在错误观察对象的父观察对象上使用retryWhen



我的服务正在使用couchbase。当我想更新文档时,该服务首先执行查找并使用CAS获取文档,然后更新文档。如果更新失败并出现CASMismatch异常,我希望对lookUp(异步请求)和更新执行延迟重试(重试时)。问题是重试只调用更新可观察对象,而不是整个lookUp和更新异步请求。

这是重试代码:

`    public Func1<Observable<? extends Throwable>, Observable<?>> getRetryOnCASMismatchExceptionHandler(String unrecoverableErrorMessage) {
return observable -> observable
.zipWith(Observable.range(1, maxAttempts), ImmutablePair::of)
.flatMap(pair -> {
var throwable = pair.left;
var attemptsCounter = pair.right;
if (throwable instanceof CASMismatchException) {
// Retry code
return Observable.timer((long) attemptsCounter * backoffMs, TimeUnit.MILLISECONDS);
}
// Pass the throwable
return Observable.error(new RuntimeException(unrecoverableErrorMessage, throwable));
});
}`

更新代码:

private Observable<String> updateDetectionWithRetry(DetectionFeed detectionFeed, String userId, String detectionPath) 
{
updateDetection(detectionFeed, userId, detectionPath)
.retryWhen(retryHandlers.getRetryOnCASMismatchExceptionHandler("Failed to update persisted UserAccount with detection data [" + detectionFeed.toString() + "]"));
}

private Observable<String> updateDetection(DetectionFeed detectionFeed, String userId, String detectionPath) 
{
return userRepo
.lookupDetection(userId, detectionPath)
.filter(existingDetection -> isNewDetectionFeed(existingDetection, detectionFeed))
.flatMap(detectionToPersist -> userRepo.replaceDetection(userId, detectionPath,
createDetectionToPersist(detectionFeed), detectionToPersist.getCas()))
}
Observable.defer(() -> updateDetection(detectionFeed, userId, detectionPath))

observable .defer()会将方法封装在observable中,每次出错都会重试所有进程(lookUp和replace)

完整的正确代码:

private Single<Optional<String>> updateUserAccountDetection(DetectionFeed detectionFeed, String userId, String detectionPath) {
return Observable.defer(() -> updateDetection(detectionFeed, userId, detectionPath))
.retryWhen(RetryBuilder
.anyOf(CASMismatchException.class)
.delay(Delay.fixed(1L, TimeUnit.SECONDS))
.max(3)
.build())
.map(Optional::ofNullable)
.toSingle();
}