我的系统是同一Node和同一JVM中的高速流系统,我想使用NetServer接收消息,并使用事件总线将消息转发到同一Vertx中的其他Verticle。
正如我们所知,在Vert.x中,NetServer是由事件循环线程调用的。我遇到了一个问题。假设事件总线使用者速度慢,而NetServer速度太快,无法将数据发送到事件总线,如果使用者缓冲区已满,则事件总线将丢弃并丢失数据。所以我在想,是否有任何方法可以在NetServer中进行流量控制?或者事件总线对NetServer有背压,以降低发送到事件总线的速度。
我感谢你的想法和意见。
据我所见,Vert.x EventBus不会为本地消费者丢弃消息。
尽管文档中指出DEFAULT_ACCEPT_BACKLOG
设置为1024,但它实际上是-1:
https://github.com/eclipse-vertx/vert.x/blob/1ab9884cb29adea75b378c3fa7359813f3de68f9/src/main/java/io/vertx/core/net/NetServerOptions.java#L45
而且我看不出它在任何地方被用来限制EventBus的大小。
您也可以使用此测试来确保所有消息都能真正到达:
public class BackpressureExample {
public static void main(String[] args) {
Vertx vertx = Vertx.vertx(new VertxOptions().setEventBusOptions(new EventBusOptions().setAcceptBacklog(1)));
vertx.deployVerticle(new SlowConsumer(), new DeploymentOptions().setWorker(true), h -> {
for (int i = 0; i < 10_000; i++) {
vertx.eventBus().send("address", UUID.randomUUID().toString());
}
System.out.println("Done producing");
});
}
private static class SlowConsumer extends AbstractVerticle {
@Override
public void start() {
AtomicInteger counter = new AtomicInteger();
this.vertx.eventBus().consumer("address", h -> {
System.out.println(counter.incrementAndGet());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
}
}
您可以在这里看到如何向当地消费者传递信息:https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/eventbus/impl/EventBusImpl.java#L359