首先,如果这个问题在其他地方得到了处理,我很抱歉,我只是没有找到解决我特定问题的方法。
我有一个网关服务器,它从外部接收消息。它把它放在我的订单处理服务器正在侦听的队列中。队列1—网关服务器,队列2—清除服务器。
因此,在我的订单处理器中,我有工作线程。我正在使用ExecutorService来管理我的线程。问题出在工作线程中。
在工作线程中,我创建了两个MQ实例,用于将消息发布到清除服务器或网关服务器。我基本上需要做一些处理,然后将该消息发布到这些队列中。
我想知道的是,每次处理完消息时,我是否应该关闭工作线程中的通道和连接?
如果我在处理完一条消息后没有关闭每个工作线程上的MQ连接,那么在处理完8-900条消息后,我会间歇性地出现以下异常:
java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Unknown Source)
at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:307)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
at interfaces.MQ.<init>(MQ.java:41)
at orderProcessor.ProcessOrders.<init>(ProcessOrders.java:109)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:52)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.spawnThread(ProcessIncomingCSThread.java:70)
at orderProcessor.ProcessIncomingCSThread.routeIncoming(ProcessIncomingCSThread.java:45)
at interfaces.ProcessIncomingThread.run(ProcessIncomingThread.java:47)
如果我在处理完一条消息后关闭了每个工作线程上的连接,那么过一段时间后,我会间歇性地出现以下异常:]
Exception in thread "AMQP Connection 127.0.0.1:5672" java.lang.OutOfMemoryError: unable to create new native thread
at java.lang.Thread.start0(Native Method)
at java.lang.Thread.start(Unknown Source)
at com.rabbitmq.client.impl.ChannelManager.scheduleShutdownProcessing(ChannelManager.java:108)
at com.rabbitmq.client.impl.ChannelManager.handleSignal(ChannelManager.java:94)
at com.rabbitmq.client.impl.AMQConnection.finishShutdown(AMQConnection.java:696)
at com.rabbitmq.client.impl.AMQConnection.shutdown(AMQConnection.java:669)
at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:550)
当我创建一个MQ连接以供使用以保持RabbitMQ的内部队列合理时,我使用basicQos
。
我以以下方式创建MQ连接以供使用:
_channel.queueDeclare(this._mqName.toString(), true, false, false, null);
_channel.basicConsume(this._mqName.toString(), true, _consumer);
_channel.basicQos(50);
感谢您查看此信息,如有任何建议或帮助,我们将不胜感激。我很可能没有根据自己的情况正确行事。。
似乎有内存泄漏。使用探查器。
感谢大家的投入。我解决了这个问题。
我在每个工作线程中创建连接。现在,我在主线程上创建连接,并将其传递给从该连接创建通道的工作线程。这似乎是一种享受。
然而,这意味着,我将不得不重新设计MQ类来处理此工作流。
我不熟悉RabbitMQ,但我怀疑您或RabbitMQ正在尝试创建更多配置为操作系统处理的线程。
也许这两个链接可以帮助你:
- "java.lang.OutOfMemoryError:无法创建新的本机线程"
- http://javaeesupportpatterns.blogspot.de/2012/09/outofmemoryerror-unable-to-create-new.html