如何使用 MessageListener 声明队列



我使用 Spring Boot 2.2,需要通过 JMS 接收消息。

我看到然后我们可以使用注释

@Component
public class JMSReceiver {
@JmsListener(destination = "queue")
public void receiveMessage(String message) {
System.out.println("Received <" + message + ">");
}
}

实现MessageListener

@Component
public class JMSReceiver implements MessageListener {
@Override
public void onMessage(Message message) {
...    
}
}

有了MessageListener我们如何声明我们使用的队列?

您不会在MessageListener实现上声明队列,而是在MessageListenerContainer上声明队列。

例如:

@Bean
public MessageListenerContainer messageListenerContainer() {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestinationName("queue"); // Set the queue name here.
container.setMessageListener(jmsReceiver()); // Your JMS receiver message listener.
return container;
}

消息侦听器接口只有一个方法 "onMessage"。

public interface MessageListener {
void onMessage(Message message);
} 

一旦消息到达目的地,消息使用者就会通过调用消息侦听器的 onMessage(( 方法来传递它们。注册消息侦听器允许客户端异步接收消息,而无需阻止/轮询消息使用者。所以我们要在"消息消费者"中注册"消息监听器",因为消息是由消息消费者接收的,所以队列设置在"消息消费者"中(下面的例子演示了一下(下面是"消息消费者"接口:

public interface MessageConsumer {
String getMessageSelector() throws JMSException;
MessageListener getMessageListener() throws JMSException;
void setMessageListener(MessageListener listener) throws JMSException;
Message receive() throws JMSException;
Message receive(long timeout) throws JMSException;
Message receiveNoWait() throws JMSException;
void close() throws JMSException;
}

所以下面是可以帮助您的代码,

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
public class ConsumerMessageListener implements MessageListener {
private String consumerName;
public ConsumerMessageListener(String consumerName) {
this.consumerName = consumerName;
}
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println(consumerName + " received "
+ textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}

队列创建在此处完成,消息从此处发送。

import java.net.URI;
import java.net.URISyntaxException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
public class JmsMessageListenerExample {
public static void main(String[] args) throws URISyntaxException, Exception {
BrokerService broker = BrokerFactory.createBroker(new URI(
"broker:(tcp://localhost:61616)"));
broker.start();
Connection connection = null;
try {
// Producer
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"tcp://localhost:61616");
connection = connectionFactory.createConnection();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("customerQueue");
String payload = "Important Task";
Message msg = session.createTextMessage(payload);
MessageProducer producer = session.createProducer(queue);
System.out.println("Sending text '" + payload + "'");
producer.send(msg);
// Consumer
MessageConsumer consumer = session.createConsumer(queue);
consumer.setMessageListener(new ConsumerMessageListener("Consumer"));
connection.start();
Thread.sleep(1000);
session.close();
} finally {
if (connection != null) {
connection.close();
}
broker.stop();
}
}
}

从控制器发送消息并使用 Spring 引导接收:

@RestController
@RequestMapping("/transaction")
public class BackOfficeController {
@Autowired private JmsTemplate jmsTemplate;
@PostMapping("/send")
public void send(@RequestBody BackOfficeVO transaction) {
System.out.println("Sending a transaction.");
// Post message to the message queue named "BackOfficeTransactionQueue"
jmsTemplate.convertAndSend("BackOfficeTransactionQueue", transaction);
}
}
@Component
public class BackOfficeReceiver {
@Autowired
private BackOfficeTransactionRepository transactionRepository;
@JmsListener(destination = "BackOfficeTransactionQueue", containerFactory = "myFactory")
public void receiveMessage(BackOfficeVO transaction) {
System.out.println("Received <" + transaction + ">");
transactionRepository.save(transaction);
}
}

要加起来 如果您使用的是现有消息侦听器,您可以在 bean 中定义它:

<bean id="messageListener" class="ConsumerMessageListener"/>

<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="destination" ref="destination"/>
<property name="messageListener" ref="messageListener"/>
</bean>

源: https://examples.javacodegeeks.com/enterprise-java/jms/jms-messagelistener-example/https://dzone.com/articles/using-jms-in-spring-boot-1 https://docs.spring.io/spring/docs/5.1.7.RELEASE/spring-framework-reference/integration.html#jms-receiving-async

最新更新