我正试图在没有阻塞的情况下通过Vertx事件总线发送大量消息,如下所示(与Hazelcast集群):
EventBus eb = vertx.eventBus();
for (int i = 0; i < 100; i++) {
vertx.setPeriodic(1, num -> {
eb.send("clusteredEndpoint", "ping");
});
}
当计时器的数量较少时,它可以正常工作,但在大约100个计时器时,我会出现此错误。
我想知道如何在不阻塞的情况下扩展到100K事件/秒(作为参考,我写了一个可能超过这个数字的Vertx-WebSocket测试)。
如果不可能的话,我想了解什么是阻塞——看起来像是这个类中的东西:https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/eventbus/impl/clustered/Serializer.java
参考-此代码不阻塞-即使有1000个定时器:
HttpClient client = vertx.createHttpClient();
client.webSocket(8080, "localhost", "/", res -> {
for (int i = 0; i < 1000; i++) {
vertx.setPeriodic(1, num -> {
res.result().writeTextMessage("ping");
});
}
});
});
2020年12月15日上午10:54:38 io.vertx.core.impl.BlockedThreadChecker警告:线程[vert.x-eventloop-hread-1,5,main]已被阻止36794毫秒,时间限制为2000毫秒io.vertx.core.VertxException:线程在处被阻止io.vertx.coreimpl.future.FFutureImpl.addListener(FutureImpl.java:140)在io.vertx.core.impl.foreth.PromiseImpl.addListener(PromiseImpl.java:23)在io.vertx.coreimpl.future.FFutureImpl.onComplete(FutureImpl.java:133)在io.vertx.core.impl.foreth.PromiseImpl.onComplete(PromiseImpl.java:23)在io.vertx.core.spi.cluster.inimpl.selector.Selectors.withSelector(Selectors.java:48)在io.vertx.core.spi.cluster.impl.DefaultNodeSelector.selectForSend(DefaultNodeSelector.java:42)在io.vertx.core.eventbus.impl.clustered.ClusteredEventBus$$Lambda$1065/195695453.accept(未知来源)io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue$SerializedTask.procle(Serializer.java:147)在io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.checkPending(Serializer.java:94)在io.vertx.core.eventbus.impl.clustered.Serializer$SerializerQueue.add(Serializer.java:114)在io.vertx.core.eventbus.impl.clustered.Serializer.queue(Serializer.java:65)在io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendOrPub(ClusteredEventBus.java:172)在io.vertx.core.eventbus.impl.OboundDeliveryContext.next(OutboundDeliveryContent.java:127)在io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:394)在io.vertx.core.eventbus.impl.EventBusImpl.sendOrPubInternal(EventBusImpl.java:400)在io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:103)在io.vertx.core.eventbus.impl.EventBusImpl.send(EventBusImpl.java:97)位于io.vertx.example.EBTestClientLambda$start$0(EBtestClient.java:22)在io.vertx.example.EBtestClient$$Lambda$1056/1487467027.handle(未知来源)io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:939)在io.vertx.core.impl.VertxImpl$InternalTimerHandler.handle(VertxImpl.java:910)位于io.vertx.core.impl.EventLoopContext.eemit(EventLoopContent.java:52)位于io.vertx.core.impl.ContextImpl.emit(ContextImpl.java:294)io.vertx.core.impl.EventLoopContext.eemit(EventLoopContent.java:24)位于io.vertx.core.impl.AbstractContext.emit(AbstractContext.java:49)io.vertx.core.impl.EventLoopContext.eemit(EventLoopContent.java:24)位于io.vertx.core.impl.VertxImpl$InternalTimerHandler.run(VertxImpl.java:933)位于io.nety.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)在io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:176)在io.netty.util.concurrent.AbstractEventExecutiator.safeExecute(AbstractEventExecutior.java:164)在io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)在io.nety.channel.nio.NioEventLoop.run(NioEventLoop.java:500)io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)在io.netty.util.internal.ThreadExecutiorMap$2.run(ThreadExecutiorMap.java:74)在io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)在java.lang.Thread.run(Thread.java:748)
首先,您将在同一个线程上运行100个任务,因为Vert.x具有线程亲和性。如果你想避免这种情况,可以在不同的垂直方向上运行它们。但是,我仍然不认为你有100个CPU,所以会有很多争论。
将所有这些设置为每1ms执行一次意味着它们需要在10微秒内完成,其中包括网络代码,因为您使用的是集群EventBus。
所以,这是测试的编写方式,而不是Vert.x在做什么。
如果你真的想测试这种负载(我们这里说的是10万个rps),请将你的请求分散到多台机器上。
但事件发生后,我不确定Hazelcast是否能够处理这种负载。
如果你想知道真正阻止的是什么,我猜是这部分代码:
https://github.com/eclipse-vertx/vert.x/blob/master/src/main/java/io/vertx/core/spi/cluster/impl/DefaultNodeSelector.java#L43
由于我没有现成的集群Vert.x设置,我无法确认我的假设是否正确。
以下是我在进一步调查后的分析:
当使用Vertx事件总线进行远程通信时,一旦消费者不堪重负,它就会停止响应。这导致生产者阻塞,我捕获了3条不同的阻塞消息(见下文)。在阻塞警告之后有这样的警告:
警告:服务器2d1f2ce-940f-4b60-bf60-39847f31bcaf没有乒乓球-会认为它已经死了
我的问题的答案是这无关紧要"为什么";它被阻挡是因为它死了(因为它达到了一定的极限)。
我很惊讶Vert.x没有更优雅地处理这个问题——就像抛出一个异常一样。
阻塞错误#1
线程在io.vertx.core.impl.future.FutureImpl.addListener(FutureImpl.java:140)(位于io.vertx.core.impl.furt.PromiseImpl.add侦听器(PromiseImpl.java:23)(位于io.vertx.core.IMP.furt..FutureImpl.onComplete(FutureImpl.java:133)(位于io.vertx..core.impl.furt.PromiseImpl..onComplete java:48)
阻止错误#2
io.vertx.core.VertxException:线程被阻止位于java.nio.charset.CharsetEncoder。(CharsetEncoder.java:198)位于java.nio.charset.CharsetEncoder。(CharsetEncoder.java:233)位于sun.nio.cs.UTF_8$Encoder。(UTF_8.java:558)位于sun.nio.cs.UTF_8$Encoder。(UTF_8.java:554)在sun.nio.cs.UTF_8.newEncoder(UTF_8.java:72)
阻塞错误#3
io.vertx.core.VertxException:线程被阻止位于io.vertx.core.eventbus.impl.clustered.ConnectionHolder.writeMessage(ConnectionHolder.java:93)位于io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendRemote(ClusteredEventBus.java:332)位于io.vertx.core.eventbus.impl.clustered.ClusteredEventBus.sendToNode(ClusteredEventBus.java:283)