上下文使用Couchbase在2级文档存储上实现REST CRUD服务。数据模型是指向零个或多个项目文档的索引文档。索引文档是使用异步get作为Observable来检索的。之后是. flatmap(),它为每个项目文档检索零个或多个id。async get返回一个Observable,所以现在我创建的Observable是Observable>。我想链接一个。merge()操作符,它将接受"一个发出Observable的Observable,并将它们的输出合并到单个Observable的输出中",引用ReactiveX文档:)然后我将。subscribe()用于单个Observable来检索条目文档。.merge()操作符有许多签名,但我不知道如何在一系列操作符中使用它,如下所示:
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return items;
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.merge( ???????? )
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
编辑:你可能猜到我是一个被动的新手。@akarnokd的回答让我意识到我所做的事情是愚蠢的。解决方案是将项目Observable<Observable<JsonDocument>>
的排放合并到document
封闭中,并返回其结果。这将从flatMap
发出最终的JsonDocuments
:
bucket
.async()
.get(id)
.flatMap(
document -> {
JsonArray itemArray = (JsonArray) document.content().get("item");
// create Observable that gets and emits zero or more
// Observable<Observable<JsonDocument>> ie. bucket.async().gets
Observable<Observable<JsonDocument>> items =
Observable.create(observer -> {
try {
if (!observer.isUnsubscribed()) {
itemArray.forEach(
(jsonObject) -> {
String itemId = ((JsonObject)jsonObject).get("itemid").toString();
observer.onNext(
bucket.async().get(itemId)
);
}
}
);
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
);
return Observable.merge(items);
},
throwable -> {
// error handling omitted...
return Observable.empty();
},
() -> {
// on complete processing omitted...
return null;
}
)
.subscribe(
nextItem -> {
// do something with each item document...
},
throwable -> {
// error handling omitted...
},
() -> {
// do something else...
}
);
测试成功:)
由于Java的表达限制,我们不能有一个可以应用于Observble<Observable<T>>
的无参数merge()
操作符。它需要像c#那样的扩展方法。
下一个最好的办法是做一个标识flatMap
:
// ...
.flatMap(document -> ...)
.flatMap(v -> v)
.subscribe(...)
可以调用toList()
将所有发出的项收集到一个列表中。我还没有测试过,但是像这样呢:
bucket.async()
.get(id)
.flatmap(document -> { return Observable.from((JsonArray)document.content().get("item")})
.flatMap(bucket::get)
.toList()
.subscribe(results -> /* list of documents */);