我有 2 个数据源:数据库(缓存(和 api,我需要将它们组合成一个流。我知道我可以简单地使用 concatArray 或类似的东西,但我想实现更复杂的行为:
-
可观察的流,最多会发出 2 个元素。
-
它将在开始时订阅这两个来源。
-
如果 api 调用足够快(<~300ms(,它将只从中发出数据并完成流。
- 如果 api 调用速度很慢(>~300ms(,则从数据库发出数据并等待来自 API 的数据
- 如果 api 调用不成功,则从数据库发出数据并发出错误。
- 如果数据库以某种方式比 api 慢,它就无法发出其数据(流完成解决了它(
我使用以下代码完成了它:
public Observable<Entity> getEntity() {
final CompositeDisposable disposables = new CompositeDisposable();
return Observable.<Entity>create(emitter -> {
final Entity[] localEntity = new Entity[1];
//database call:
disposables.add(database.getEntity()
.subscribeOn(schedulers.io())
.doOnSuccess(entity -> localEntity[0] = entity) //saving our entity because
//apiService can emit error before 300 ms
.delay(300, MILLISECONDS)
.subscribe((entity, throwable) -> {
if (entity != null && !emitter.isDisposed()) {
emitter.onNext(entity);
}
}));
//network call:
disposables.add(apiService.getEntity()
.subscribeOn(schedulers.io())
.onErrorResumeNext(throwable -> {
return Single.<Entity>error(throwable) //we will delay error here
.doOnError(throwable1 -> {
if (localEntity[0] != null) emitter.onNext(localEntity[0]); //api error, emit localEntity
})
.delay(200, MILLISECONDS, true); //to let it emit localEntity before emitting error
})
.subscribe(entity -> {
emitter.onNext(entity);
emitter.onComplete(); //we got entity from api, so we can complete the stream
}, emitter::onError));
})
.doOnDispose(disposables::clear)
.subscribeOn(schedulers.io());
}
代码有点笨拙,我在这里创建可观察量内部的可观察量,我认为这很糟糕。但这样我就可以全局访问发射器,这允许我以我想要的方式控制主流(发出数据、成功、错误(。
有没有更好的方法来实现这一目标?我很想看看一些代码示例。谢谢!
可能是下面的代码可以完成这项工作。根据您的要求,我假设 api 和数据库处理Single<Entity>
.
private static final Object STOP = new Object();
public static void main(String[] args) {
Database database = new Database(Single.just(new Entity("D1")));
ApiService apiService = new ApiService(Single.just(new Entity("A1")));
// ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(500, MILLISECONDS));
// ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));
BehaviorSubject<Object> subject = BehaviorSubject.create();
Observable.merge(
apiService.getEntity()
.toObservable()
.doOnNext(t -> subject.onNext(STOP))
.doOnError(e -> subject.onNext(STOP))
.onErrorResumeNext(t ->
Observable.concatDelayError(database.getEntity().toObservable(),
Observable.error(t))),
database.getEntity()
.delay(300, MILLISECONDS)
.toObservable()
.takeUntil(subject)
)
.subscribe(System.out::println,
System.err::println);
Observable.timer(1, MINUTES) // just for blocking the main thread
.toBlocking()
.subscribe();
}
由于"如果数据库以某种方式比 api 慢,它无法发出其数据"和"如果 api 调用会很慢(>~300ms(,我无法删除Subject
的使用,从数据库发出数据并仍然等待来自 api 的数据"。否则,amb()
运算符将是一个很好的用途。
我希望这有所帮助。
另一个解决方案可能是这个(没有主题(:
public static void main(String[] args) throws InterruptedException {
Database database = new Database(Single.just(new Entity("D1")));
ApiService apiService = new ApiService(Single.just(new Entity("A1")));
// ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));
// ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));
database.getEntity()
.toObservable()
.groupJoin(apiService.getEntity()
.toObservable()
.onErrorResumeNext(
err -> Observable.concatDelayError(database.getEntity().toObservable(),
Observable.error(err))),
dbDuration -> Observable.timer(300, MILLISECONDS),
apiDuration -> Observable.never(),
(db, api) -> api.switchIfEmpty(Observable.just(db)))
.flatMap(o -> o)
.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("It's the end!"));
Observable.timer(1, MINUTES) // just for blocking the main thread
.toBlocking()
.subscribe();
}
如果在 300 毫秒 (dbDuration -> timer(300, MILLISECONDS)
( 内未从 API 服务发出任何内容,则会从数据库发出实体 (api.switchIfEmpty(db)
(。
如果 api 在 300 毫秒内发出一些东西,那么它只发出它的Entity
(api.switchIfEmpty(.)
(。
这似乎也可以如您所愿地工作...
另一个更好的解决方案:
public static void main(String[] args) throws InterruptedException {
Database database = new Database(Single.just(new Entity("D1")));
ApiService apiService = new ApiService(Single.just(new Entity("A1")));
// ApiService apiService = new ApiService(Single.just(new Entity("A1")).delay(400, MILLISECONDS));
// ApiService apiService = new ApiService(Single.error(new Exception("Error! Error!")));
Observable<Entity> apiServiceWithDbAsBackup =
apiService.getEntity()
.toObservable()
.onErrorResumeNext(err ->
Observable.concatDelayError(database.getEntity().toObservable(), Observable.error(err)));
Observable.amb(database.getEntity()
.toObservable()
.delay(300, MILLISECONDS)
.concatWith(apiServiceWithDbAsBackup),
apiServiceWithDbAsBackup)
.subscribe(System.out::println,
Throwable::printStackTrace,
() -> System.out.println("It's the end!"));
我们使用延迟的amb()
来使数据库可观察,以采取第一个将发出。如果 api 服务出错,我们会从数据库发出项目。 这似乎也可以如您所愿地工作...