即使订阅了多个可观察对象,在RxJava中也只触发一个事件



我正在尝试理解RxJava。我写了一小段代码,它将上传一个列表文件到firebase服务器。我写了两个observable:1. ImageUploaderService.java:上传单个文件并通知状态2. java:创建新的单个文件上传器,并为其提供一个要上传的文件路径。收集所有上传者的结果,并将上传文件的链接通知调用者。

我的问题是,即使ImageUploaderService的每个实例都在AllImageUploaderService中订阅,只有OnNext的一个事件被触发。我该怎么修理它?

AllImageUploaderService.java

    public class AllImageUploaderService extends Subscriber<String>{
    private BehaviorSubject<String[]> uploadService;
    private ArrayList<String> uploadedFiles;
    private int expectedCount;
    private int actualCount;
    public Observable<String[]> uploadImages(final String[] fileNames, final StorageReference imagesRef,
        final String[] chosenImage) {
        expectedCount = fileNames.length;
        uploadedFiles = new ArrayList<>(fileNames.length);
        uploadService = BehaviorSubject.create();
        Observable<String[]> observable = Observable.create(new Observable.OnSubscribe<String[]>(){
            @Override
            public void call(Subscriber<? super String[]> subscriber) {
                for (int i = 0; i < fileNames.length; i++) {
                    ImageUploaderService imageUploaderService=new ImageUploaderService();
                    Observable<String> observable= imageUploaderService.uploadImage(fileNames[i],imagesRef,chosenImage[i]);
                    observable.subscribe(AllImageUploaderService.this);
                }
            }
        });
        observable.subscribe(uploadService);
        return uploadService;
    }
    @Override
    public void onCompleted() {
        actualCount++;
        if(expectedCount==actualCount) {
            String[] retValue = new String[uploadedFiles.size()];
            uploadedFiles.toArray(retValue);
            uploadService.onNext(retValue);
            uploadService.onCompleted();
        }
    }
    @Override
    public void onError(Throwable e) {
        Utils.logE(this,"Error",e);
    }
    @Override
    public void onNext(String s) {
        uploadedFiles.add(s);
    }
}

ImageUploaderService.java

public class ImageUploaderService implements OnFailureListener, OnSuccessListener<UploadTask.TaskSnapshot> {
    private BehaviorSubject<String> uploadSubject;
    public Observable<String> uploadImage(final String fileNames, final StorageReference imagesRef,
        final String chosenImage) {
        uploadSubject = BehaviorSubject.create();
        Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {               
                StorageReference spaceRef = imagesRef.child(fileNames);
                UploadTask uploadTask = spaceRef.putFile(UriUtil.generatorUri(chosenImage, UriUtil.LOCAL_FILE_SCHEME));
                uploadTask.addOnFailureListener(ImageUploaderService.this);
                uploadTask.addOnSuccessListener(ImageUploaderService.this);
            }
        });
        observable.subscribe(uploadSubject);
        return uploadSubject;
    }
    @Override
    public void onFailure(@NonNull Exception exception) {
        uploadSubject.onError(exception);
        uploadSubject.onCompleted();
    }
    @Override
    public void onSuccess(UploadTask.TaskSnapshot taskSnapshot) {
        Uri downloadUrl = taskSnapshot.getDownloadUrl();
        uploadSubject.onNext(downloadUrl.toString());
        uploadSubject.onCompleted();
    }
}

如果您能告诉我更好的方法,那就太好了。

不需要把事情弄得太复杂:

public Observable<String> uploadImages(final String[] fileNames, final StorageReference imagesRef,
    final String[] chosenImage) {
    return Observable
        .range(0, fileNames.length)
        .flatMap(i -> new ImageUploaderService()
             .uploadImage(fileNames[i],imagesRef,chosenImage[i]))
}

当一切结束时,你需要它作为一个集合吗?.toList() .

另外,我认为你不需要ImageUploaderService:

中的主题
public class ImageUploader implements OnFailureListener, OnSuccessListener<UploadTask.TaskSnapshot> {
  public static Observable<String> uploadImage(final String fileNames, final StorageReference imagesRef,
    final String chosenImage) {
    Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {               
            ImageUploader uploader = new ImageUploader(subscriber);
            StorageReference spaceRef = imagesRef.child(fileNames);
            UploadTask uploadTask = spaceRef.putFile(UriUtil.generatorUri(chosenImage, UriUtil.LOCAL_FILE_SCHEME));
            uploadTask.addOnFailureListener(uploader);
            uploadTask.addOnSuccessListener(uploader);
        }
    });
    return observable;
  }
  /////////
  private final Subscriber<String> subscriber;
  private ImageUploader(final Subscriber<String> subscriber) {
    this.subscriber = subscriber;
  }
  @Override
  public void onFailure(@NonNull Exception exception) {
    if(subscriber.isUnsubscribed()) return;
    subscriber.onError(exception);
  }
  @Override
  public void onSuccess(UploadTask.TaskSnapshot taskSnapshot) {
    if(subscriber.isUnsubscribed()) return;
    Uri downloadUrl = taskSnapshot.getDownloadUrl();
    uploadSubject.onNext(downloadUrl.toString());
    uploadSubject.onCompleted();
  }
}

注意,不要在onError()之后发出onComplete(),这两个都是可观察对象的终结状态,并且都应该是最后一个。

相关内容

  • 没有找到相关文章

最新更新