Spring JMSListener 合并消息



使用并发"5"线程的侦听器实现Spring JMS,

<jms:listener-container container-type="default" concurrency="5-10" connection-factory="cachingConnectionFactory"   >
    <jms:listener destination="TEST.FOO" ref="messageListener" />
 </jms:listener-container>

当我丢弃 5 条消息时,5 个线程正在侦听,我可以读取消息。

我的问题是如何合并所有 5 条消息,是否可以编写一些构建器,构建器可以等待一段时间,那么当在该时间内收到任何消息时,我可以合并所有消息?

法典:

    long startTime = 0;
                if (messageCount == 0) {
                    startTime = System.currentTimeMillis();
                }    
messageCount ++;
            if (messageCount < batchSize && (startTime > (System.currentTimeMillis() - maximumBatchWaitTime))) { // Or if some batch timelimit say
                        line += stringMessage;
                        reached = true; // this is volatile variable ,messageCount also volatile variable
                    }
                    System.out.println(line);
                    try {
                        if (reached) {
                            messageCount = 0;
                            line = "";
                            execService.createExecFile(line);
                        }
                    } catch (final Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

问候瑞杰

您的messageListener是单例,因此所有线程都将使用相同的实例,就像这样,您可以同步对方法的调用,以将消息作为该实例上的字段附加到新行。

更新 1

use org.springframework.jms.core.JmsTemplate.receive()

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import org.apache.activemq.command.ActiveMQTextMessage;
public class DefaultMessageListener implements MessageListener {
    private volatile String line = "";
    private volatile int messageCount;
    private int batchSize;
    private volatile long startTime = 0;
    private long maximumBatchWaitTime;
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
            while (true) {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                }
                if (messageCount > 0 && (startTime > (System.currentTimeMillis() - maximumBatchWaitTime))) {
                    createExecFile();
                }
            }
        }
    });
    {
        thread.start();
    }
    @Override
    public synchronized void onMessage(Message message) {
        if (messageCount == 0) {
            startTime = System.currentTimeMillis();
        }
        try {
            messageCount++;
            line += ((TextMessage) message).getText();
            System.out.println(line);
            // (startTime > (System.currentTimeMillis() - maximumBatchWaitTime))
            // why to do this ?? not needed i think
            if (messageCount == batchSize) {
                createExecFile();
            }
        } catch (final Exception e) {
            e.printStackTrace();
        }
    }
    private void createExecFile() {
        try {
            execService.createExecFile(line);
        } catch (final Exception e) {
            e.printStackTrace();
        }
        messageCount = 0;
        line = "";
    }
}

更新 2

import javax.jms.Message;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQSession;
import org.apache.activemq.command.ActiveMQMessage;
import org.springframework.jms.core.JmsTemplate;
public class DefaultMessageListener {
    private volatile String line = "";
    private volatile int messageCount;
    private long maximumBatchWaitTime;
    JmsTemplate jmsTemplate;
    public void getMessages() {
        try {
            // configure bean jmsTemplate like this on definition
            jmsTemplate.setSessionAcknowledgeMode(ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
            jmsTemplate.setReceiveTimeout(maximumBatchWaitTime);
            //
            Message message = jmsTemplate.receive();
            if (message != null) {
                messageCount++;
                line += ((TextMessage) message).getText();
            }
            System.out.println(line);
            if (messageCount > 0) {
                createExecFile();
            }
            message.acknowledge();
        } catch (final Exception e) {
            e.printStackTrace();
        }
    }
    private void createExecFile() {
        try {
            execService.createExecFile(line);
        } catch (final Exception e) {
            e.printStackTrace();
        }
        messageCount = 0;
        line = "";
    }
}

INDIVIDUAL_ACKNOWLEDGE:当消费者再次连接时,未确认的消息将被重新传递。示例 http://alvinalexander.com/java/jwarehouse/activemq/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java.shtml

更新 3

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQSession;
import org.springframework.jms.core.JmsTemplate;
public class DefaultMessageListener {
    private volatile String line = "";
    private volatile int messageCount;
    private long maximumBatchWaitTime;
    JmsTemplate jmsTemplate;
    private ActiveMQSession session;
    public void getMessages() throws JMSException {
        try {
            // configure bean jmsTemplate like this on definition
            jmsTemplate.setSessionAcknowledgeMode(ActiveMQSession.CLIENT_ACKNOWLEDGE);
            jmsTemplate.setReceiveTimeout(maximumBatchWaitTime);
            //
            Message message = jmsTemplate.receive();
            if (message != null) {
                messageCount++;
                line += ((TextMessage) message).getText();
            }
            System.out.println(line);
            if (messageCount > 0) {
                createExecFile();
            }
        } catch (final Exception e) {
            e.printStackTrace();
            session.recover();
            messageCount = 0;
            line = "";
        }
    }
    private void createExecFile() {
        execService.createExecFile(line);
        session.acknowledge();
        messageCount = 0;
        line = "";
    }
}

最新更新