在 rxjava2 中的异步请求完成之前调用订阅者 onNext



我已经使用 RxJava2 在 MVP 中实现了存储库模式

远程数据源.java

public Observable<List<A>> getAList(){
return ApiService.
getAList()
.compose(RxUtils.applySchedulers())
.doOnSubscribe(disposable -> Timber.d(..))
.doOnError(throwable -> Timber.d(..))
.doOnComplete(() -> {
Timber.d(..);
});
}

本地数据源.java

public Observable<List<A>> getAList(){
return mDbHelper .....from SQLBrite..
}
public void saveAList(List<<A> a){
SQlBriteTransaction...
}

存储库.java(更新(

@Inject
public Repository(DownloadUtils downloadUtils){
this.mDownloadUtils = downloadUtils;
}
@Override
public Observable<List<A>> getAList(){
return  mRemoteDataSource
.getAList()
.flatMapIterable(List<A> -> a)
.flatMap(A a ->
************************************************************   
return Observable.fromIterable(a.getB())
.flatMap((Function<B, ObservableSource<B>>) b ->
Observable.create(emitter -> 
emitter.onNext(new 
DownloadUtils().downloadFiles(b,totalListCount,emitter))))
.toList()
.toObservable()
***************************************************
.toList()
.toObservable()
.doOnNext( List<A>  a -> {
--------------Only the first change in B value is inserted in Db-
mLocalDataSource.saveAList(a);
});
}

DownLoadUtils.java(更新(

void downloadBFiles(B b, int totalCount,ObservableEmitter<B> emitter){
fileCount = b.size;
b.get(index).setDataToChange(dataToChange);
*** I am using PR Downloader for aynchronous download using 
RECURSION **
PRDownloader.download(remoteUrl, filePath, fileName)
.build()
.setOnStartOrResumeListener(() -> {
})
.setOnProgressListener(progress -> {
int progressPercent = (int) (progress.currentBytes * 
100 / progress.totalBytes);,
})
.start(new OnDownloadListener() {
@Override
public void onDownloadComplete() {
********************* emitter.onComplete() ******************

@Override
public void onError(Error error) {
}   
}

主讲人.java

void getVideosFromRepo(){
disposable = mRepository
.getAList()
.doOnSubscribe(d _-> "Started Loading")
.subscribe(
//OnNext
------------- Here the OnNext is being called before Asynchronous Operation completes!!-------
List<A> a -> mView.setAList(a);
)

}

在演示器实现之上,甚至在异步下载完成之前,也会在演示器的下一个中返回列表...需要什么更改,以便在所有下载完成后调用onNext(订阅(!!

您在 RxJava 观察器链之外使用异步服务,因此 RxJava 无法管理传递的数据。由于downloadBFiles()使用单独的观察者链,可以这么说,您已经丢失了线程。

您需要使用flatMap()

以便下载结果包含在观察器链中,而不是使用doOnNext()来触发下载。

相关内容

  • 没有找到相关文章

最新更新