同时返回输入变量和结果的任何 Rx 运算符



当使用Rx(特别是RxJava)时,是否有一个运算符会将输入变量与函数的输出一起打包,以便在下一步中使用两者?

例如,假设我从推文 ID 列表开始,我的程序 a) 执行 REST 调用以获取该推文的文本,b) 将 id 和文本保存到本地数据库中。 正常的"map"将返回步骤 a 的文本,但它会丢弃原始 id。 在代码中:

Observable.from(tweet_id_list)
    .specialMap(i -> getTweetText(i))  // is there a "specialMap" which returns the string result AND ALSO the id?
    .map((i, s) -> saveToLocalDB(i, s)  // because we need both for the ensuing function

我破解了getTweetText(i)以返回包含两个变量而不是简单字符串的 HashMap,但如果有一个 Rx 运算符可以在不修改底层函数的情况下执行此操作,那将非常有用。

创建或使用通用的对类型或元组类型,并使用它映射流。 像这样:

Observable.from(tweet_id_list)
          .map(id -> Pair.create(id,getTweetText(id))
          .map(pair -> saveToLocalDB(pair.first(),pair.second())

如果您想要填充所有 ids->text 的完整地图:

Observable.from(tweet_id_list)
    .toMap((id) -> id, (id) -> getTweetText(id));
    .subscribe((tweetMap) -> saveAllToLocalDB(tweetMap));

这将发出(假设整数 ID 和字符串文本)单个 Map。如果您可以一次将整个地图保存到数据库,而不是像原始代码那样进行多次保存调用,这将很有用(正如我上面假设的那样)。

您希望确保避免在可观察调用中出现副作用。数据库保存等副作用应仅在您进行订阅调用并观察结果时发生,而不是在映射本身中发生。如果需要在后台进行保存,则可以在.subscribe调用之前添加类似.observeOn(Schedulers.io())的内容,具体取决于线程需求。

如果您更喜欢单个结果并逐个保存它们,则可能需要使用平面图,以便可以并行完成查找。我在这里使用defer,这样我们就不会立即阻止getTweetText调用,而是等待每个被订阅。这使我们能够控制我们执行工作的线程。请注意,由于请求可能是并发的,因此结果不一定与tweet_id_list的顺序相同。

Observable.from(tweet_id_list)
    .flatMap((id) -> Observable.defer(() -> Observable.just(
        Pair.create(id, getTweetText(id))).subscribeOn(Schedulers.io()))
    .subscribe((pair) -> saveToLocalDB(pair.first(), pair.second())

相关内容

  • 没有找到相关文章

最新更新