当使用Rx(特别是RxJava)时,是否有一个运算符会将输入变量与函数的输出一起打包,以便在下一步中使用两者?
例如,假设我从推文 ID 列表开始,我的程序 a) 执行 REST 调用以获取该推文的文本,b) 将 id 和文本保存到本地数据库中。 正常的"map"将返回步骤 a 的文本,但它会丢弃原始 id。 在代码中:
Observable.from(tweet_id_list)
.specialMap(i -> getTweetText(i)) // is there a "specialMap" which returns the string result AND ALSO the id?
.map((i, s) -> saveToLocalDB(i, s) // because we need both for the ensuing function
我破解了getTweetText(i)
以返回包含两个变量而不是简单字符串的 HashMap,但如果有一个 Rx 运算符可以在不修改底层函数的情况下执行此操作,那将非常有用。
创建或使用通用的对类型或元组类型,并使用它映射流。 像这样:
Observable.from(tweet_id_list)
.map(id -> Pair.create(id,getTweetText(id))
.map(pair -> saveToLocalDB(pair.first(),pair.second())
如果您想要填充所有 ids->text 的完整地图:
Observable.from(tweet_id_list)
.toMap((id) -> id, (id) -> getTweetText(id));
.subscribe((tweetMap) -> saveAllToLocalDB(tweetMap));
这将发出(假设整数 ID 和字符串文本)单个 Map。如果您可以一次将整个地图保存到数据库,而不是像原始代码那样进行多次保存调用,这将很有用(正如我上面假设的那样)。
您希望确保避免在可观察调用中出现副作用。数据库保存等副作用应仅在您进行订阅调用并观察结果时发生,而不是在映射本身中发生。如果需要在后台进行保存,则可以在.subscribe
调用之前添加类似.observeOn(Schedulers.io())
的内容,具体取决于线程需求。
如果您更喜欢单个结果并逐个保存它们,则可能需要使用平面图,以便可以并行完成查找。我在这里使用defer
,这样我们就不会立即阻止getTweetText调用,而是等待每个被订阅。这使我们能够控制我们执行工作的线程。请注意,由于请求可能是并发的,因此结果不一定与tweet_id_list
的顺序相同。
Observable.from(tweet_id_list)
.flatMap((id) -> Observable.defer(() -> Observable.just(
Pair.create(id, getTweetText(id))).subscribeOn(Schedulers.io()))
.subscribe((pair) -> saveToLocalDB(pair.first(), pair.second())