我有一个数据库,可以在RxJava
链工作时进行更新。
RxJava
链如下所示:
Flowable.generate(/*Query database for 1 item*/)
.subscribeOn(Schedulers.single())
.concatMap(/*Mapping*/)
.concatMap(/*Mapping*/)
.subscribe(/*Consumers*/)
数据库可以在RxJava
链处理排放的过程中进行更新。因此,Flowable.generate()
应该一次请求一个项目,并等到 onNext(( 消费者完成,并且只有在查询下一项之后。
是否可以仅在调用onNext之后才懒惰地查询Flowable.generate((中的项目?
No.generate
将根据下游需求生成物料。如果可以控制需求,则可以控制生成物料的时间和数量。但是,大多数时候,尤其是那些concatMap
,您将无法获得如此精细的控制。另一种方法是通过主题/处理器创建反馈循环,并将其项目映射到数据库的异步查询:
FlowableProcessor<Integer> processor = PublishProcessor.<Integer>create().toSerialized();
processor
.concatMap(_ ->
getNextSingleItemAsync().subscribeOn(Schedulers.single())
)
.concatMap(/*Mapping*/)
.concatMap(/*Mapping*/)
.subscribe(item -> {
/* Process item. */
// get to the next item
processor.onNext(1);
});
processor.onNext(0); // start the loop