我正在尝试通过亚马逊的S3 Android SDK上传文件。我已经使用过RX Java,但我不确定如何将此方法转换为返回可观察量的方法,因为我想将此方法的结果链接到另一个可观察量调用。我想这让我感到困惑,因为它不会立即返回,并且在 OnError 或 OnState 更改之前无法返回。如何以 RX 方式处理这些情况?
public void uploadFile(TransferObserver transferObserver){
transferObserver.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
}
});
}
如果有人可以用RX Java 2和lambdas回答,那就太好了,因为我只是一直缺少这个
通常是在异步/回调工作到反应式工作之间的桥梁的正确方法,但现在不鼓励使用Observable.create()
,因为它需要高级知识才能使其正确。
您应该使用更新的创建方法Observable.fromEmitter()
,它看起来非常相似:
return Observable.fromEmitter(new Action1<Emitter<Integer>>() {
@Override
public void call(Emitter<Integer> emitter) {
transObs.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
if (state == TransferState.COMPLETED)
emitter.onCompleted();
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
emitter.onError(ex);
}
});
emitter.setCancellation(new Cancellable() {
@Override
public void cancel() throws Exception {
// Deal with unsubscription:
// 1. unregister the listener to avoid memory leak
// 2. cancel the upload
}
});
}
}, Emitter.BackpressureMode.DROP);
这里添加的是:处理取消订阅:取消上传,取消注册以避免内存泄漏,并指定背压策略。
您可以在此处阅读更多内容。
附加说明:
- 如果您对进度感兴趣,可以在
onProgressChanged()
处调用进度的 Next(( 并将 Observable 转换为Observable<Integer>
。 - 如果没有,您可能需要考虑使用
Completable
,它是可观察的,没有onNext()
排放,但只有onCompleted()
如果您对进度指示不感兴趣,这才适合您的情况。
@Yosriz 我无法编译您的代码,但您确实帮助了我很多,因此根据您的答案,这就是我现在拥有的:
return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() {
@Override
public void call(AsyncEmitter<Integer> emitter) {
transObs.setTransferListener(new TransferListener() {
@Override
public void onStateChanged(int id, TransferState state) {
if (state == TransferState.COMPLETED)
emitter.onCompleted();
}
@Override
public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
}
@Override
public void onError(int id, Exception ex) {
emitter.onError(ex);
}
});
emitter.setCancellation(new AsyncEmitter.Cancellable() {
@Override
public void cancel() throws Exception {
transObs.cleanTransferListener();
}
});
}
}, AsyncEmitter.BackpressureMode.DROP);