Rx Java Android :如何将此回调块转换为 Observer



我正在尝试通过亚马逊的S3 Android SDK上传文件。我已经使用过RX Java,但我不确定如何将此方法转换为返回可观察量的方法,因为我想将此方法的结果链接到另一个可观察量调用。我想这让我感到困惑,因为它不会立即返回,并且在 OnError 或 OnState 更改之前无法返回。如何以 RX 方式处理这些情况?

public void uploadFile(TransferObserver transferObserver){
    transferObserver.setTransferListener(new TransferListener() {
        @Override
        public void onStateChanged(int id, TransferState state) {
        }
        @Override
        public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
        }
        @Override
        public void onError(int id, Exception ex) {
        }
  });
}

如果有人可以用RX Java 2和lambdas回答,那就太好了,因为我只是一直缺少这个

通常是在异步/回调工作到反应式工作之间的桥梁的正确方法,但现在不鼓励使用Observable.create(),因为它需要高级知识才能使其正确。
您应该使用更新的创建方法Observable.fromEmitter(),它看起来非常相似:

    return Observable.fromEmitter(new Action1<Emitter<Integer>>() {
        @Override
        public void call(Emitter<Integer> emitter) {
            transObs.setTransferListener(new TransferListener() {
                @Override
                public void onStateChanged(int id, TransferState state) {
                    if (state == TransferState.COMPLETED)
                        emitter.onCompleted();
                }
                @Override
                public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
                }
                @Override
                public void onError(int id, Exception ex) {
                    emitter.onError(ex);
                }
            });
            emitter.setCancellation(new Cancellable() {
                @Override
                public void cancel() throws Exception {
                    // Deal with unsubscription:
                    // 1. unregister the listener to avoid memory leak
                    // 2. cancel the upload 
                }
            });
        }
    }, Emitter.BackpressureMode.DROP);

这里添加的是:处理取消订阅:取消上传,取消注册以避免内存泄漏,并指定背压策略。
您可以在此处阅读更多内容。

附加说明:

  • 如果您对进度感兴趣,可以在 onProgressChanged() 处调用进度的 Next(( 并将 Observable 转换为 Observable<Integer>
  • 如果没有,您可能需要考虑使用Completable,它是可观察的,没有onNext()排放,但只有onCompleted()如果您对进度指示不感兴趣,这才适合您的情况。

@Yosriz 我无法编译您的代码,但您确实帮助了我很多,因此根据您的答案,这就是我现在拥有的:

return Observable.fromEmitter(new Action1<AsyncEmitter<Integer>>() {
            @Override
            public void call(AsyncEmitter<Integer> emitter) {
                transObs.setTransferListener(new TransferListener() {
                    @Override
                    public void onStateChanged(int id, TransferState state) {
                        if (state == TransferState.COMPLETED)
                            emitter.onCompleted();
                    }
                    @Override
                    public void onProgressChanged(int id, long bytesCurrent, long bytesTotal) {
                    }
                    @Override
                    public void onError(int id, Exception ex) {
                        emitter.onError(ex);
                    }
                });
                emitter.setCancellation(new AsyncEmitter.Cancellable() {
                    @Override
                    public void cancel() throws Exception {
                        transObs.cleanTransferListener();
                    }
                });
            }
        }, AsyncEmitter.BackpressureMode.DROP);

最新更新