无法在 MDB 中同时处理消息



我正在尝试创建一个带有EJB注释的简单MDB,以便我可以异步执行任务。

有数千个缓慢的任务要执行,硬件有许多处理器和 RAM,所以我需要以并发方式运行它们(在许多线程中(。它在开始时(在许多线程中(以这种方式工作,但是在某些消息之后,它会"缩小"并一次只处理一条消息。

以下是一些相关信息:

  1. 我正在使用Java 1.8.0_221和WildFly 19.1.0。

  2. 这是我的 MDB 消费者:

@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")
})
public class MessageConsumerTest implements MessageListener {
(...)
}
  1. 这个消费者有一些注入的依赖关系。它们中的大多数都有@Stateless注释,有些具有@Singleton注释。对于单一实例依赖项,它们都具有@ConcurrencyManagement(ConcurrencyManagementType.BEAN)注释。

  2. standalone-full.xml中声明的应用程序数据源具有jta="false"参数。

  3. 我的队列是非持久性的(如果 WildFly 停止,生产者会检查并再次重新发送所有挂起的消息(,所以我在我的生产者中做了这个:

@Inject
@JMSConnectionFactory("java:jboss/DefaultJMSConnectionFactory")
private JMSContext context;
@Resource(mappedName = "java:/jms/queue/MessageQueue")
private Queue queue;
(...)
context.createProducer().setDeliveryMode(DeliveryMode.NON_PERSISTENT).send(queue, msg);

我尝试更改很多东西(standalone-full.xml中的池大小,MDB消费者中的@ActivationConfigProperty注释,-D....参数(,但没有一个有效。结果总是相同的:MDB 开始同时处理许多对象,但下降到一个。

改变这种行为和/或进行更深入分析的正确方法是什么?

提前感谢!

更多信息:我尝试在"jboss-cli"应用程序中检查队列运行命令,结果如下:

一开始,有 3 个"ServerConsumer"(这是预期的行为(:

[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3e141c0-b56d-11ea-8030-b8aeed89da5a:f3e1b6f2-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378547L
expiration=0
type=3
priority=4
userID=ID:227e8bd1-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929521797L
consumerName=ServerConsumer [id=f3ea1b75-b56d-11ea-8030-b8aeed89da5a:f3ea4287-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378523L
expiration=0
type=3
priority=4
userID=ID:f63d0c39-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447548L
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L

一段时间后,只有 2 个ServerConsumer实例,但似乎是给其中一个的两条消息:

[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3d29bb3-b56d-11ea-8030-b8aeed89da5a:f3d66c45-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378580L
expiration=0
type=3
priority=4
userID=ID:6a77afd0-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929642548L
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378565L
expiration=0
type=3
priority=4
userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929552548L

再过一段时间,只有一个ServerConsumer实例,它似乎正在处理所有消息。此时,并发性消失了!

[standalone@localhost:9990 /] jms-queue list-delivering-messages --queue-address=MessageQueue
consumerName=ServerConsumer [id=f3e86dc2-b56d-11ea-8030-b8aeed89da5a:f3e93114-b56d-11ea-8030-b8aeed89da5a:0, filter=null, binding=LocalQueueBinding [address=jms.queue.MessageQueue, queue=QueueImpl[name=jms.queue.MessageQueue, postOffice=PostOfficeImpl [server=ActiveMQServerImpl::serverUUID=388a227e-938d-11ea-975c-b8aeed89da5a], temp=false]@524ecb7c]]
elements
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378520L
expiration=0
type=3
priority=4
userID=ID:f6170da8-b56d-11ea-8030-b8aeed89da5a
timestamp=1592929447299L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378565L
expiration=0
type=3
priority=4
userID=ID:34d2c6c7-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929552548L
durable=false
address=jms.queue.MessageQueue
__AMQ_CID=f3d078cd-b56d-11ea-8030-b8aeed89da5a
_AMQ_ROUTING_TYPE=1
messageID=253403378610L
expiration=0
type=3
priority=4
userID=ID:9751c5ea-b56e-11ea-8030-b8aeed89da5a
timestamp=1592929717797L

我相信这是由于 MDB 底层 JMS 会话中的消息缓冲。尝试将consumerWindowSize激活配置属性设置为0,例如:

@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "java:/jms/queue/MessageQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
@ActivationConfigProperty(propertyName = "consumerWindowSize", propertyValue = "0")
})
public class MessageConsumerTest implements MessageListener {
(...)
}

此设置在 ActiveMQ Artemis 文档中进一步讨论。

相关内容

  • 没有找到相关文章

最新更新