Netty处理程序中的JMS使用者



我正在设计一个相当复杂的系统,想知道将jms使用者(activemq、vm协议、非持久性)放入netty处理程序的最佳方法是什么。

让我解释一下,我有几个客户端使用websocket连接到我的netty服务器。对于每个客户端连接,我都会创建一个jms使用者,用于侦听有关一个或多个主题的有趣消息。如果收到一条有趣的消息,我需要在使用websocket将消息发送到客户端之前做一个额外的步骤(额外的过滤)。

以下是一个好方法吗:

  • 在SimpleChannelInboundHandler中,我声明一个私有的非静态使用者
  • 使用者在channelActive中初始化
  • 消费者在channelInactive中被破坏
  • 当消费者收到消息时,我会使用ctx.channel().write()进行额外的过滤并发送消息

在这个设置中,我有点担心消费者可能会变成慢消费者,并减慢一切速度,因为网络套接字会通过互联网。

我想出了一个更复杂的方法来将"消费者接收消息"one_answers"通过网络套接字发送消息"解耦。

  • 在SimpleChannelInboundHandler中,我声明一个私有的非静态使用者
  • 使用者在channelActive中初始化
  • 消费者在channelInactive中被破坏
  • 当消费者收到消息时,我将其放入阻塞队列
  • 每分钟我都让一个线程(为每个客户端创建)在队列中查找,并使用ctx.channel().write()将找到的消息发送给客户端

在这一点上,我有点担心每个客户端的额外线程。

或者有没有更好的方法来完成这项任务?

这是一个典型的慢速使用者问题,解决它的第一步是确定检测到慢速使用者时的适当操作。如果慢消费者错过消息是可以接受的,那么解决方案是在删除消息或从订阅源取消订阅消息方面进行一些更改。例如,如果客户端错过消息是可以接受的,那么当从JMS接收到消息时,请检查通道是否可写。如果不是,请删除该消息。如果你想给自己多一点缓冲区(尽管操作系统缓冲区很大),你可以跟踪未来尚未完成的写入完成次数(即消息尚未写入操作系统发送缓冲区),如果有太多未完成的写入请求,则可以丢弃消息。

如果客户端可能不会错过消息,并且速度一直很慢,那么问题就更难解决了。一种选择可能是将消息转移到具有特定头值的JMS队列,然后打开一个新的使用者,该使用者使用JMS选择器从该队列读取消息。这将给JMS服务器带来更多的负载,但可能适合暂时的缓慢,希望它不会干扰您的主主题提要。或者,您可能希望将消息存储在不同的存储中,例如数据库中,这样您就可以在可以发送消息时轮询消息。如果你做对了,一个轮询线程可以处理许多客户端(查询有未处理消息的客户端,然后为每个客户端加载一堆消息)。然而,这并不像使用JMS那样方便。

我不会选择选项2,因为阻塞队列只能暂时解决问题,而且您可以通过跟踪有多少写操作等待完成来实现同样的目的。

最新更新