我正在尝试理解RxJava。我写了一小段代码,它将上传一个列表文件到firebase服务器。我写了两个observable:1. ImageUploaderService.java:上传单个文件并通知状态2. java:创建新的单个文件上传器,并为其提供一个要上传的文件路径。收集所有上传者的结果,并将上传文件的链接通知调用者。
我的问题是,即使ImageUploaderService的每个实例都在AllImageUploaderService中订阅,只有OnNext的一个事件被触发。我该怎么修理它?
AllImageUploaderService.java
public class AllImageUploaderService extends Subscriber<String>{
private BehaviorSubject<String[]> uploadService;
private ArrayList<String> uploadedFiles;
private int expectedCount;
private int actualCount;
public Observable<String[]> uploadImages(final String[] fileNames, final StorageReference imagesRef,
final String[] chosenImage) {
expectedCount = fileNames.length;
uploadedFiles = new ArrayList<>(fileNames.length);
uploadService = BehaviorSubject.create();
Observable<String[]> observable = Observable.create(new Observable.OnSubscribe<String[]>(){
@Override
public void call(Subscriber<? super String[]> subscriber) {
for (int i = 0; i < fileNames.length; i++) {
ImageUploaderService imageUploaderService=new ImageUploaderService();
Observable<String> observable= imageUploaderService.uploadImage(fileNames[i],imagesRef,chosenImage[i]);
observable.subscribe(AllImageUploaderService.this);
}
}
});
observable.subscribe(uploadService);
return uploadService;
}
@Override
public void onCompleted() {
actualCount++;
if(expectedCount==actualCount) {
String[] retValue = new String[uploadedFiles.size()];
uploadedFiles.toArray(retValue);
uploadService.onNext(retValue);
uploadService.onCompleted();
}
}
@Override
public void onError(Throwable e) {
Utils.logE(this,"Error",e);
}
@Override
public void onNext(String s) {
uploadedFiles.add(s);
}
}
ImageUploaderService.java
public class ImageUploaderService implements OnFailureListener, OnSuccessListener<UploadTask.TaskSnapshot> {
private BehaviorSubject<String> uploadSubject;
public Observable<String> uploadImage(final String fileNames, final StorageReference imagesRef,
final String chosenImage) {
uploadSubject = BehaviorSubject.create();
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
StorageReference spaceRef = imagesRef.child(fileNames);
UploadTask uploadTask = spaceRef.putFile(UriUtil.generatorUri(chosenImage, UriUtil.LOCAL_FILE_SCHEME));
uploadTask.addOnFailureListener(ImageUploaderService.this);
uploadTask.addOnSuccessListener(ImageUploaderService.this);
}
});
observable.subscribe(uploadSubject);
return uploadSubject;
}
@Override
public void onFailure(@NonNull Exception exception) {
uploadSubject.onError(exception);
uploadSubject.onCompleted();
}
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot) {
Uri downloadUrl = taskSnapshot.getDownloadUrl();
uploadSubject.onNext(downloadUrl.toString());
uploadSubject.onCompleted();
}
}
如果您能告诉我更好的方法,那就太好了。
不需要把事情弄得太复杂:
public Observable<String> uploadImages(final String[] fileNames, final StorageReference imagesRef,
final String[] chosenImage) {
return Observable
.range(0, fileNames.length)
.flatMap(i -> new ImageUploaderService()
.uploadImage(fileNames[i],imagesRef,chosenImage[i]))
}
当一切结束时,你需要它作为一个集合吗?.toList()
.
另外,我认为你不需要ImageUploaderService:
中的主题public class ImageUploader implements OnFailureListener, OnSuccessListener<UploadTask.TaskSnapshot> {
public static Observable<String> uploadImage(final String fileNames, final StorageReference imagesRef,
final String chosenImage) {
Observable<String> observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
ImageUploader uploader = new ImageUploader(subscriber);
StorageReference spaceRef = imagesRef.child(fileNames);
UploadTask uploadTask = spaceRef.putFile(UriUtil.generatorUri(chosenImage, UriUtil.LOCAL_FILE_SCHEME));
uploadTask.addOnFailureListener(uploader);
uploadTask.addOnSuccessListener(uploader);
}
});
return observable;
}
/////////
private final Subscriber<String> subscriber;
private ImageUploader(final Subscriber<String> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onFailure(@NonNull Exception exception) {
if(subscriber.isUnsubscribed()) return;
subscriber.onError(exception);
}
@Override
public void onSuccess(UploadTask.TaskSnapshot taskSnapshot) {
if(subscriber.isUnsubscribed()) return;
Uri downloadUrl = taskSnapshot.getDownloadUrl();
uploadSubject.onNext(downloadUrl.toString());
uploadSubject.onCompleted();
}
}
注意,不要在onError()
之后发出onComplete()
,这两个都是可观察对象的终结状态,并且都应该是最后一个。