我想按顺序从Websphere MQ分组读取10000条消息,我使用下面的代码来执行相同的操作,但是读取所有消息需要很长时间。甚至我尝试使用多线程概念,但有时2个线程正在消耗相同的组和竞争条件发生。下面是代码片段。我试图使用3个线程依次从MQ读取10000条消息,但是我的两个线程一次访问同一个组。如何避免这种情况?按顺序读取大量消息的最佳方法是什么?我的要求是我要按顺序读取10000条消息。请帮助。
MQConnectionFactory factory = new MQConnectionFactory();
factory.setQueueManager("QM_host")
MQQueue destination = new MQQueue("default");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageConsumer lastMessageConsumer =
session.createConsumer(destination, "JMS_IBM_Last_Msg_In_Group=TRUE");
TextMessage lastMessage = (TextMessage) lastMessageConsumer.receiveNoWait();
lastMessageConsumer.close();
if (lastMessage != null) {
int groupSize = lastMessage.getIntProperty("JMSXGroupSeq");
String groupId = lastMessage.getStringProperty("JMSXGroupID");
boolean failed = false;
for (int i = 1; (i < groupSize) && !failed; i++) {
MessageConsumer consumer = session.createConsumer(destination,
"JMSXGroupID='" + groupId + "'AND JMSXGroupSeq=" + i);
TextMessage message = (TextMessage)consumer.receiveNoWait();
if (message != null) {
System.out.println(message.getText());
} else {
failed = true;
}
consumer.close();
}
if (failed) {
session.rollback();
} else {
System.out.println(lastMessage.getText());
session.commit();
}
}
connection.close();
我认为更好的方法是在应用程序中有一个协调器线程,它将侦听组的最后消息,并且每个线程将启动一个新线程以获取属于分配给该线程的组中的消息。(这将满足竞争条件。)
在获取属于一个组的消息的线程中,您不需要使用for循环分别获取每个消息,相反,您应该获取属于该组的任何消息,同时维护一个组计数器并缓冲乱序消息。只要您在接收和处理组的所有消息后才提交会话,这将是安全的。(这将产生更高的性能,因为每个组将由一个单独的线程处理,并且该线程在MQ中只访问每条消息一次。)
请参阅IBM关于消息顺序检索的文档。如果页面移动或更改,我将引用最相关的部分。为了保证顺序处理,必须满足以下条件:
- 所有的put请求都来自同一个应用程序。
- 所有的put请求要么来自同一个工作单元,要么所有的put请求都是在一个工作单元之外发出的。
- 所有消息的优先级相同。
- 所有消息都具有相同的持久性。
- 对于远程队列,配置是这样的:从发出put请求的应用程序到其队列管理器,通过相互通信,到目标队列
- 消息不会被放入死信队列(例如,如果队列暂时已满)。
- 获取消息的应用程序不会故意改变检索顺序,例如通过指定特定的MsgId或使用消息优先级。
- 只有一个应用程序正在执行get操作以从目标队列检索消息。如果不止一个应用程序,这些应用程序必须被设计为获得所有的
虽然页面没有明确说明这一点,但当他们说"一个应用程序"时,所指的是该应用程序的单个线程。如果应用程序有并发线程,则不能保证处理顺序。
此外,在另一个响应中建议的在单个工作单元中读取10,000条消息是而不是建议作为保持消息顺序的一种手段!只有当10,000条消息必须作为一个原子单元成功或失败时才这样做,这与它们是否按顺序接收无关。如果必须在单个工作单元中处理大量消息,则绝对有必要调整日志文件的大小,很可能还需要调整其他一些参数。保持序列顺序对于任何线程异步消息传输来说都是足够折磨人的,而且还会引入运行很长时间的大量事务。
您可以使用Java(非JMS)的MQ类做您想做的事情,也可以使用JMS的MQ类,但确实很棘手。
首先从MQ知识中读取该页。
我将伪代码(来自上面的网页)转换为Java的MQ类,并将其从浏览更改为破坏性获取。
另外,我更喜欢在一个同步点下处理每组消息(假设有一个合理大小的组)。
首先,您缺少GMO (GetMessageOptions)的'options'字段的几个标志,MatchOptions字段需要设置为'MQMO_MATCH_MSG_SEQ_NUMBER',以便所有线程始终在组中获取第一条消息。也就是说,不要像你上面说的那样,为第一条消息抓取组中的第二条消息。
MQGetMessageOptions gmo = new MQGetMessageOptions();
MQMessage rcvMsg = new MQMessage();
/* Get the first message in a group, or a message not in a group */
gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_ALL_MSGS_AVAILABLE | CMQC.MQGMO_WAIT | CMQC.MQGMO_SYNCPOINT;
gmo.MatchOptions = CMQC.MQMO_MATCH_MSG_SEQ_NUMBER;
rcvMsg.messageSequenceNumber = 1;
inQ.get(rcvMsg, gmo);
/* Examine first or only message */
...
gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_SYNCPOINT;
do while ((rcvMsg.messageFlags & CMQC.MQMF_MSG_IN_GROUP) == CMQC.MQMF_MSG_IN_GROUP)
{
rcvMsg.clearMessage();
inQ.get(rcvMsg, gmo);
/* Examine each remaining message in the group */
...
}
qMgr.commit();