RxJava 上的运算符 可观察/可流动,用于延迟 n 项的发射



我想转换一个Flowable,以便它延迟发出项目,直到收集到指定数量的项目,然后按FIFO顺序将它们发送到下游,保持恒定的延迟项目计数。上游发出 onComplete 信号后,排队的项目应在发出 onComplete 之前刷新到下游:

(在此示例中,延迟物料编号为 3(

1 2 3 4 5 6 7 |
1 2 3 4 5 6 7 |

我没有看到任何现有的运算符执行此操作或可以修改以获得该行为。Observable.delay似乎只支持基于时间的延迟,不支持基于计数的延迟。

实现自定义运算符来实现这一点应该很容易,但也许现有运算符有更简单的方法?

您可以发布序列,跳过最后一个 N,然后将最后 N 个追加回来:

Flowable.range(1, 7)
.flatMap(v -> Flowable.timer(v * 200, TimeUnit.MILLISECONDS).map(w -> v))
.doOnNext(v -> System.out.println(v))
// -------------------------------------------------------------------
.publish(f -> 
f.skipLast(3).mergeWith(f.takeLast(3))
)
// -------------------------------------------------------------------
.blockingSubscribe(v -> System.out.println("<-- " + v));

最新更新