java文档说以下内容:
仅当发布者为该特定的最后一个值提供的时间窗口中没有发出新值时,才发出最后一个值。
。
但是,我发现上述描述令人困惑。我在吉特聊天中读到,它类似于在rxjava中的辩论。有人可以用一个例子来说明它吗?进行彻底搜索后,我找不到任何地方。
sampleTimeout
让您将伴侣Flux
X'
与源中每个传入的值x
相关联。如果X'
在源中发出下一个值之前完成,则将发出值x
。如果没有,则删除x
。相同的处理应用于后续值。
将其视为将原始序列分为窗口,从每个伴侣通量的开始和完成。如果两个窗口重叠,则触发第一个的值将被删除。
在另一侧,您的sample(Duration)
仅处理一个伴侣通量。它将序列分配到正常时间段的连续的窗口中,并在特定窗口期间丢弃除最后一个元素以外的所有元素。
(编辑(:关于您的用例
如果我正确理解,看来您的处理时间是您要定期安排的不同长度的处理,但是您也不想考虑哪种处理需要多个 erem> ?
如果是这样,听起来您要1(使用publishOn
和2(在要求的第二部分中仅需要sample(Duration)
(分配给任务的延迟没有更改(。
类似的东西:
List<Long> passed =
//regular scheduling:
Flux.interval(Duration.ofMillis(200))
//this is only to show that processing is indeed started regularly
.elapsed()
//this is to isolate the blocking processing
.publishOn(Schedulers.elastic())
//blocking processing itself
.map(tuple -> {
long l = tuple.getT2();
int sleep = l % 2 == 0 || l % 5 == 0 ? 100 : 210;
System.out.println(tuple.getT1() + "ms later - " + tuple.getT2() + ": sleeping for " + sleep + "ms");
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
return l;
})
//this is where we say "drop if too long"
.sample(Duration.ofMillis(200))
//the rest is to make it finite and print the processed values that passed
.take(10)
.collectList()
.block();
System.out.println(passed);
输出:
205ms later - 0: sleeping for 100ms
201ms later - 1: sleeping for 210ms
200ms later - 2: sleeping for 100ms
199ms later - 3: sleeping for 210ms
201ms later - 4: sleeping for 100ms
200ms later - 5: sleeping for 100ms
201ms later - 6: sleeping for 100ms
196ms later - 7: sleeping for 210ms
204ms later - 8: sleeping for 100ms
198ms later - 9: sleeping for 210ms
201ms later - 10: sleeping for 100ms
196ms later - 11: sleeping for 210ms
200ms later - 12: sleeping for 100ms
202ms later - 13: sleeping for 210ms
202ms later - 14: sleeping for 100ms
200ms later - 15: sleeping for 100ms
[0, 2, 4, 5, 6, 8, 10, 12, 14, 15]
因此,大约每200ms触发了阻塞处理,并且仅值在200ms之内进行处理。