以块为单位从通量中读取<Integer>



是否可以以块的形式从webflux流量中读取?(使用delayElements除外(

例如,在我写之后

Flux.range(1, 10).doOnNext(System.out::println).take(5).subscribe();

有什么方法可以继续读取接下来的5个整数吗?

如果没有,消费者是否有其他选择来决定何时请求下一条数据?

编辑:

为了澄清,我想读取前5个值,然后暂停直到程序中的任意稍后时间,然后读取接下来的5个值而不重新创建发射器通量。简单地调用buffer((是做不到的

那么您需要一个完整的异步订阅者对象,而不仅仅是一个方法链。

// use maven dependency 'org.df4j:df4j-core:8.3'
import org.df4j.core.dataflow.Actor;
import org.df4j.core.port.InpFlow;
import org.junit.Assert;
import org.junit.Test;
import reactor.core.publisher.Flux;
public class FluxSubscriberTest {
@Test
public  void test10() {
FluxSubscriber subscriber = new FluxSubscriber();
Flux.range(1, 10).subscribe(subscriber.inp);
subscriber.start();
boolean ok = subscriber.blockingAwait(5000);
Assert.assertTrue(ok);
}
static class FluxSubscriber extends Actor {
InpFlow<Integer> inp = new InpFlow<>(this, 5);
int count = 0;
@Override
protected void runAction() throws Throwable {
if (inp.isCompleted()) {
System.out.println("input stream completed");
complete();
return;
}
Integer value = inp.remove();
System.out.println("value="+value);
if (++count==5) {
count = 0;
System.out.println("pause:");
delay(1000);
}
}
}
}

事实上,它首先读取5个项目,然后在每次调用inp.remove()之后逐个读取。如果这不是您想要的,那么您可以扩展类InpFlow,以便在调用Subscription.request()时修改策略。

源代码可在https://github.com/akaigoro/df4j(是的,我是作者(。

buffer是您要查找的关键字

Flux.range(1, 10)
.buffer(5)
.doOnNext(System.out::println)
.subscribe();

输出:

[1, 2, 3, 4, 5]
[6, 7, 8, 9, 10]

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html#buffer--

相关内容

最新更新