我正在尝试创建一个带有EJB注释的简单MDB,以便我可以异步执行任务。
有数千个缓慢的任务要执行,硬件有许多处理器和 RAM,所以我需要以并发方式运行它们(在许多线程中(。它在开始时(在许多线程中(以这种方式工作,但是在某些消息之后,它会"缩小"并一次只处理一条消息。
以下是一些相关信息:
-
我正在使用Java 1.8.0_221和WildFly 19.1.0。
-
这是我的 MDB 消费者:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
public class MessageConsumerTest implements MessageListener {
(...)
}
这个消费者有一些注入的依赖关系。它们中的大多数都有
@Stateless
注释,有些具有@Singleton
注释。对于单一实例依赖项,它们都具有@ConcurrencyManagement(ConcurrencyManagementType.BEAN)
注释。在
standalone-full.xml
中声明的应用程序数据源具有jta="false"
参数。我的队列是非持久性的(如果 WildFly 停止,生产者会检查并再次重新发送所有挂起的消息(,所以我在我的生产者中做了这个:
@Inject
@JMSConnectionFactory("java:jboss/DefaultJMSConnectionFactory")
private JMSContext context;
@Resource(mappedName = "java:/jms/queue/MessageQueue")
private Queue queue;
(...)
context.createProducer().setDeliveryMode(DeliveryMode.NON_PERSISTENT).send(queue, msg);
我尝试更改很多东西(standalone-full.xml
中的池大小,MDB消费者中的@ActivationConfigProperty
注释,-D....
参数(,但没有一个有效。结果总是相同的:MDB 开始同时处理许多对象,但下降到一个。
改变这种行为和/或进行更深入分析的正确方法是什么?
提前感谢!
更多信息:我尝试在"jboss-cli"应用程序中检查队列运行命令,结果如下:
一开始,有 3 个"ServerConsumer"(这是预期的行为(:
[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3e141c0-b56d-11ea-8030-b8aeed89da5a:f3e1b6f2-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378547L
expiration=0
type=3
priority=4
userID=ID:227e8bd1-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929521797L
consumerName=ServerConsumer [id=f3ea1b75-b56d-11ea-8030-b8aeed89da5a:f3ea4287-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378523L
expiration=0
type=3
priority=4
userID=ID:f63d0c39-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447548L
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L
一段时间后,只有 2 个ServerConsumer
实例,但似乎是给其中一个的两条消息:
[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3d29bb3-b56d-11ea-8030-b8aeed89da5a:f3d66c45-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378580L
expiration=0
type=3
priority=4
userID=ID:6a77afd0-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929642548L
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378565L
expiration=0
type=3
priority=4
userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929552548L
再过一段时间,只有一个ServerConsumer
实例,它似乎正在处理所有消息。此时,并发性消失了!
[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378565L
expiration=0
type=3
priority=4
userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929552548L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378610L
expiration=0
type=3
priority=4
userID=ID:9751c5ea-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929717797L
我相信这是由于 MDB 底层 JMS 会话中的消息缓冲。尝试将consumerWindowSize
激活配置属性设置为0
,例如:
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "consumerWindowSize", propertyValue = "0")
})
public class MessageConsumerTest implements MessageListener {
(...)
}
此设置在 ActiveMQ Artemis 文档中进一步讨论。