我需要从服务器获取相机ID,然后通过ID获取此相机的事件,最后通过ID获取图像。然后,所有这些需要组合成一个UI post{相机名称-图像url -它的事件}例如,我们可能有5个摄像机名称,30个事件id(一个摄像机名称6个)和30个url。我尝试使用zip来组合可观察api调用(分别)获得相机名称,事件id和图像url。但据我所知,由于zip无法正确匹配5个名称和30个id,因此无法正常工作告诉我这有什么用?
GetApiMethods getApiMethods = NetworkService.getInstance().createService(GetApiMethods.class);
Observable<Camera> cameras = getCamera();
Observable<Event> events = getEvents();
Observable<Post> combined = Observable.zip(cameras, events, (camera, event) -> new Post(camera.getDisplayName(),"null",event.getType()));
public LiveData<String> CameraRequests() {
combined.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromArray)
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Post post) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
return liveData;
}
public Observable<Event> getEvents() {
return Observable.create(allEvents -> getApiMethods.getCameras()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromIterable)
.concatMap(Observable::fromIterable)
.concatMap(camera -> getApiMethods.getEvents(camera.getAccessPoint().replace("hosts", "")))
.concatMap(Observable::fromIterable)
.map(Event::getId)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String event) {
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("RXJAVA1", String.valueOf(e));
}
@Override
public void onComplete() {
}
})
);
}
public Observable<Camera> getCamera() {
return Observable.create(cameras1 -> getApiMethods.getCameras()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.concatMap(Observable::fromIterable)
.map(Camera::getDisplayName)
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull String s) {
}
@Override
public void onError(@NonNull Throwable e) {
Log.d("RXJAVA", String.valueOf(e));
}
@Override
public void onComplete() {
}
})
);
}
如果我使用flatMap
:
Observable<Post> getPostsObservable() {
return getCameras().flatMap(camera ->
// for each camera, build observable to get events
getEvents(camera.accessPoint).flatMap(event ->
// for each event, build observable to get imageUrl
getImageUrl(event.id).map(imageUrl ->
// combine camera, event and imageUrl in current rx-chain
new Post(camera, event, imageUrl)
)
)
);
}
你可能需要稍微调整一下你的代码,比如简化你的getCameras
和getEvents
方法。你可以看看下面的演示,它模仿了你的应用程序,以更好地理解它是如何工作的,以及需要修改什么。如果调用getPosts
方法,您应该在logcat中看到Posts被正确创建和接收。
import android.util.Log;
import androidx.annotation.NonNull;
import java.util.ArrayList;
import java.util.List;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class RxDemo {
// your Camera class
public class Camera {
private final String accessPoint;
public Camera(String accessPoint) {
this.accessPoint = accessPoint;
}
public String getAccessPoint() {
return accessPoint;
}
}
// your Event class
public class Event {
private final String id;
public Event(String id) {
this.id = id;
}
public String getId() {
return id;
}
}
// your Post class
public class Post {
private final Camera camera;
private final Event event;
private final String imageUrl;
public Post(Camera camera, Event event, String imageUrl) {
this.camera = camera;
this.event = event;
this.imageUrl = imageUrl;
}
public Camera getCamera() {
return camera;
}
public Event getEvent() {
return event;
}
public String getImageUrl() {
return imageUrl;
}
@NonNull
@Override
public String toString() {
return "camera: " + camera.accessPoint + "; event: " + event.id + "; imageUrl: " + imageUrl;
}
}
// mocked getApiMethods.getCameras(), return 5 mocked cameras
private Observable<List<Camera>> apiGetCameras() {
List<Camera> cameras = new ArrayList<>();
cameras.add(new Camera("cam_1"));
cameras.add(new Camera("cam_2"));
cameras.add(new Camera("cam_3"));
cameras.add(new Camera("cam_4"));
cameras.add(new Camera("cam_5"));
return Observable.just(cameras);
}
// mocked getApiMethods.getEvents, return 6 mocked events with cameraId postFix
private Observable<List<Event>> apiGetEvents(String cameraId) {
List<Event> events = new ArrayList<>();
events.add(new Event("event_1_" + cameraId));
events.add(new Event("event_2_" + cameraId));
events.add(new Event("event_3_" + cameraId));
events.add(new Event("event_4_" + cameraId));
events.add(new Event("event_5_" + cameraId));
events.add(new Event("event_6_" + cameraId));
return Observable.just(events);
}
// mocked getApiMethods.getImageUrl
private Observable<String> apiGetImageUrl(String eventId) {
return Observable.just("http://my.domain.com/" + eventId + ".png");
}
public Observable<Camera> getCameras() {
return apiGetCameras().concatMapIterable(cam -> cam);
}
public Observable<Event> getEvents(String cameraId) {
return apiGetEvents(cameraId).concatMapIterable(event -> event);
}
// build observable that emits posts
private Observable<Post> getPostsObservable() {
return getCameras().flatMap(camera ->
// for each camera, build observable to get events
getEvents(camera.accessPoint).flatMap(event ->
// for each event, build observable to get imageUrl
apiGetImageUrl(event.id).map(imageUrl ->
// combine camera, event and imageUrl in current rx-chain
new Post(camera, event, imageUrl)
)
)
);
}
// call this in your activity/fragment/viewModel to get the posts
public void getPosts() {
getPostsObservable()
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Post post) {
Log.d("AAAA", post.toString());
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
}
}