Rx用限制+超时缓冲快速生产流

  • 本文关键字:缓冲 超时 Rx rx-java
  • 更新时间 :
  • 英文 :


我试图通过在适当的时候执行批处理请求来优化RxJava的web服务调用,但不会给响应带来太多延迟。为此,我使用buffer(closingSelector)操作符,debounce()作为关闭选择器,如下所示:

Observable<BaseCall<T, R>> burstyMulticast = requestStream.share();
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast.debounce(windowSize, windowUnit);
burstyMulticast.buffer(burstyDeBounced).subscribe(/* call external WS with batches */);

它工作得很好,除了如果requestStream产生太快,它会发出大量的批,太大的WS处理一次,所以我想以某种方式限制批大小。所以我需要一个closingSelector,如果缓冲区中有X个项目,或者自从上一个项目从上游到达后经过了Y个时间,它就会发出一个关闭事件。

我似乎找不到一个好的解决方案,而不是实现一个自定义的Operator类似于OperatorDebounceWithTime,但有一个内部缓冲区,返回缓冲区中的所有元素,而不是最后一个。

是否有更简单的方法来实现这一点,例如通过组合一些操作?

编辑:

在发布问题后,我意识到上面的代码块有另一个问题:如果请求流连续快于脱线超时(requestStreamwindowSize产生更快),那么burstyDeBounced不会发出任何东西,所以所有请求将被缓冲,直到在传入流中有足够长的暂停。

您可以将未绑定源的大缓冲区拆分为较小的缓冲区:

Observable<BaseCall<T, R>> burstyMulticast = requestStream.share();
Observable<BaseCall<T, R>> burstyDeBounced = burstyMulticast
    .debounce(windowSize, windowUnit);
burstyMulticast.buffer(burstyDeBounced)
.onBackpressureBuffer()
.concatMapIterable(list -> Lists.partition(list, windowSizeLimit))
.subscribe(...);

列表。分区来自Google Guava。

我最终实现了一个自定义op: https://gist.github.com/zsoltm/79462b37c0943b4fbef2ee3968155f27,它似乎工作得很好。我很乐意接受改进建议。

相关内容

  • 没有找到相关文章

最新更新