使用并发"5"线程的侦听器实现Spring JMS,
<jms:listener-container container-type="default" concurrency="5-10" connection-factory="cachingConnectionFactory" >
<jms:listener destination="TEST.FOO" ref="messageListener" />
</jms:listener-container>
当我丢弃 5 条消息时,5 个线程正在侦听,我可以读取消息。
我的问题是如何合并所有 5 条消息,是否可以编写一些构建器,构建器可以等待一段时间,那么当在该时间内收到任何消息时,我可以合并所有消息?
法典:
long startTime = 0;
if (messageCount == 0) {
startTime = System.currentTimeMillis();
}
messageCount ++;
if (messageCount < batchSize && (startTime > (System.currentTimeMillis() - maximumBatchWaitTime))) { // Or if some batch timelimit say
line += stringMessage;
reached = true; // this is volatile variable ,messageCount also volatile variable
}
System.out.println(line);
try {
if (reached) {
messageCount = 0;
line = "";
execService.createExecFile(line);
}
} catch (final Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
问候瑞杰
您的messageListener
是单例,因此所有线程都将使用相同的实例,就像这样,您可以同步对方法的调用,以将消息作为该实例上的字段附加到新行。
更新 1
use org.springframework.jms.core.JmsTemplate.receive()
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
public class DefaultMessageListener implements MessageListener {
private volatile String line = "";
private volatile int messageCount;
private int batchSize;
private volatile long startTime = 0;
private long maximumBatchWaitTime;
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
if (messageCount > 0 && (startTime > (System.currentTimeMillis() - maximumBatchWaitTime))) {
createExecFile();
}
}
}
});
{
thread.start();
}
@Override
public synchronized void onMessage(Message message) {
if (messageCount == 0) {
startTime = System.currentTimeMillis();
}
try {
messageCount++;
line += ((TextMessage) message).getText();
System.out.println(line);
// (startTime > (System.currentTimeMillis() - maximumBatchWaitTime))
// why to do this ?? not needed i think
if (messageCount == batchSize) {
createExecFile();
}
} catch (final Exception e) {
e.printStackTrace();
}
}
private void createExecFile() {
try {
execService.createExecFile(line);
} catch (final Exception e) {
e.printStackTrace();
}
messageCount = 0;
line = "";
}
}
更新 2
import javax.jms.Message;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.jms.core.JmsTemplate;
public class DefaultMessageListener {
private volatile String line = "";
private volatile int messageCount;
private long maximumBatchWaitTime;
JmsTemplate jmsTemplate;
public void getMessages() {
try {
// configure bean jmsTemplate like this on definition
jmsTemplate.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
jmsTemplate.setReceiveTimeout(maximumBatchWaitTime);
//
Message message = jmsTemplate.receive();
if (message != null) {
messageCount++;
line += ((TextMessage) message).getText();
}
System.out.println(line);
if (messageCount > 0) {
createExecFile();
}
message.acknowledge();
} catch (final Exception e) {
e.printStackTrace();
}
}
private void createExecFile() {
try {
execService.createExecFile(line);
} catch (final Exception e) {
e.printStackTrace();
}
messageCount = 0;
line = "";
}
}
INDIVIDUAL_ACKNOWLEDGE:当消费者再次连接时,未确认的消息将被重新传递。示例 http://alvinalexander.com/java/jwarehouse/activemq/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java.shtml
更新 3
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQSession;
import org.springframework.jms.core.JmsTemplate;
public class DefaultMessageListener {
private volatile String line = "";
private volatile int messageCount;
private long maximumBatchWaitTime;
JmsTemplate jmsTemplate;
private ActiveMQSession session;
public void getMessages() throws JMSException {
try {
// configure bean jmsTemplate like this on definition
jmsTemplate.setSessionAcknowledgeMode(ActiveMQSession.CLIENT_ACKNOWLEDGE);
jmsTemplate.setReceiveTimeout(maximumBatchWaitTime);
//
Message message = jmsTemplate.receive();
if (message != null) {
messageCount++;
line += ((TextMessage) message).getText();
}
System.out.println(line);
if (messageCount > 0) {
createExecFile();
}
} catch (final Exception e) {
e.printStackTrace();
session.recover();
messageCount = 0;
line = "";
}
}
private void createExecFile() {
execService.createExecFile(line);
session.acknowledge();
messageCount = 0;
line = "";
}
}