rxjava依赖性方法与async进行循环调用



我有多个我想用rxjava解决的依赖性调用:

  1. 获取所有具有尚未上传的文件的实体
  2. 上传文件到服务器
  3. 更新带有收到URL的实体
  4. 上传服务器的实体

我尝试了不同的方法,但不能使它起作用。我无法关注如何等到文件上传完成。这是我当前的代码:

Observable.fromIterable(repository.getFileNotUploaded()) // Returns a list of all entities that should be uploaded to the server
    .flatMap(entity ->
            restService.uploadFile(new File(directory.getPath(), entity.getLocalPath()))
                    .subscribe(fileUrl -> {
                        entity.setFileUrl(fileUrl);
                        repository.update(entity);
                    }));
// TODO: Wait until all files have been uploaded and the entities have been stored
// locally. Then upload the list of all entities. 

休息电话:

public Observable<String> uploadFile(File file) {
    return Observable.create(emitter -> {
        AsyncTask.execute(new Runnable() {
            @Override
            public void run() {
                commClient.sendFileRequest(URL, file,
                        response -> {
                        // ...
                        if(success){
                                emitter.onNext(new String(response.data));
                        }
                            emitter.onComplete();
                        }
            }
        });
    });
}

我还读到平面地图中的呼叫订阅是一种反图案。如何级联我的方法电话?我应该使用范围方法并返回迭代的当前声音吗?


编辑 - 感谢Emanuel S

伊曼纽尔的解决方案作品。我还必须更改我的休息服务。此服务必须返回Observable<Entity>而不是Observable<String>。另请注意,不应混合异步和Rx。

public Observable<Entity> uploadFile(File file, Entity entity) {
//...
entity.setFileUrl(fileUrl);
emitter.onNext(entity);
//...

如果repository.getFilenotuploaded()是一个arraylist/collection,您应该在以后使用公正和迭代创建您的观察。

这可能有效(未经测试,在没有IDE的情况下写)并将所有Entites上传到一堆中。

正如Akarnokd所写的那样,您不需要使用justflatmapIterable,因为您只能使用fromIterable

Observable.just(repository.getFileNotUploaded()).flatMapIterable ( files -> file)
// OR 
Observable.flatmapIterable(repository.getFileNotUploaded())
.flatMap(entity -> rs.uploadFile(new File(yourPath, entity.getLocalPath())) // for each file not uploaded run another observable and return it
.map(entity -> { entity.setFieldUrl(fileUrl); return entity; }) // modify the entity and return it
.toList() // get it back to List<String> with your entities
.flatMap(entityList -> uploadEntityObservable(entityList))
.subscribe( 
    success -> Log.d("Success", "All entities and files uploaded"), 
    error -> Log.e("Error", "error happened somewhere")
);

如果您想用一个调用每个修改的实体上传,则可能需要替换

.toList() // get it back to List<String> with your entities
.flatMap(entityList -> uploadEntityObservable(entityList ))

.flatMapIterable( singleEntity -> uploadSingleEntity(singleEntity))

不要与rxjava混合。如果您有rxjava,则不需要异步。

注意:如果您流式传输数据,如果您需要使用的存储库可观察到的数据,则需要使用

repository.getFileNotUploaded() // Observable<Whatever> emit data in a stream.
.flatMapIterable ( files -> file) ...

最新更新