我需要在所有JMS侦听器上应用一些预检查和常见步骤,例如验证针对模式的原始消息(JSON Schema(。示例 -
@Component
public class MyService {
@JmsListener(destination = "myDestination")
public void processOrder(Order order) { ... }
}
现在,在春季将消息从队列转换为订单之前,我需要做以下 -
- 将原始消息与标题记录到自定义记录器中。
- 针对JSON模式验证JSON消息(文本消息((假设我在这里只有一个模式(
- 如果模式验证失败,请记录错误并投掷异常
- 如果模式验证通过,请继续控制弹簧以进行转换并继续使用过程顺序方法。
Spring JMS体系结构是否提供了注入上述需求的任何方法?我知道AOP跨越了头脑,但我不确定它会与@jmslistener一起使用。
一种相当简单的技术是将autoStartup
设置为侦听器容器工厂的false
。
然后,使用JmsListenerEndpointRegistry
bean获取侦听器容器。
然后getMessageListener()
,将其包装在AOP代理和setMessageListener()
中。
然后启动容器。
可能有一种更优雅的方式,但我认为您必须涉足听众创建代码的胆量,这很涉及。
编辑
带有Spring Boot的示例:
@SpringBootApplication
public class So49682934Application {
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(So49682934Application.class, args);
}
@JmsListener(id = "listener1", destination = "so49682934")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Bean
public ApplicationRunner runner(JmsListenerEndpointRegistry registry, JmsTemplate template) {
return args -> {
DefaultMessageListenerContainer container =
(DefaultMessageListenerContainer) registry.getListenerContainer("listener1");
Object listener = container.getMessageListener();
ProxyFactory pf = new ProxyFactory(listener);
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
advisor.addMethodName("onMessage");
pf.addAdvisor(advisor);
container.setMessageListener(pf.getProxy());
registry.start();
Thread.sleep(5_000);
Foo foo = new Foo("baz");
template.convertAndSend("so49682934", foo);
};
}
@Bean
public MessageConverter converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("typeId");
return converter;
}
public static class MyJmsInterceptor implements MethodInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Message message = (Message) invocation.getArguments()[0];
logger.info(message.toString());
// validate
return invocation.proceed();
}
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
和
spring.jms.listener.auto-startup=false
和
M2018-04-06 11:42:04.859信息59745 --- [EnerContainer-1] e.so49682934Application $ myJMSInterpector:activemqtextMessage {commandId = 5,pressiveErired = 5,pressiveerequired = id yid = id = id = id = id =:-4:2:1:1:1,ointrimentDestination = null,OriginalTranSactionId = null,ProducerId = ID:Gollum.Local-60138-1523029319662-4:2:2:2:1:1:1:1:1,destination = queue://so49682934,transactionid = null = null = null = null = null= null,compressed = false,userId = null,content = null,marshalledProperties = null,dataCtructure = null,redeliveryCounter = 0,size = 1050,properties = {typeId = com.example.so496829334application $ foo},readonlyproperties = realllyproperties = true true,= true,droppable = false,jmsxgroupfirstforconsumer = false,text = {" bar":&quort; baz;}}}
2018-04-06 11:42:04.882信息59745 --- [enercontainer-1] ichication $$ EnhancerBysBysBysBySpringCglib $$ e29327b8:foo [bar = baz]
>
edit2
这是通过基础架构进行操作的方法...
@SpringBootApplication
@EnableJms
public class So496829341Application {
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(So496829341Application.class, args);
}
@JmsListener(id = "listen1", destination="so496829341")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
Thread.sleep(5_000);
template.convertAndSend("so496829341", new Foo("baz"));
};
}
@Bean
public MessageConverter converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("typeId");
return converter;
}
@Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public static JmsListenerAnnotationBeanPostProcessor bpp() {
return new JmsListenerAnnotationBeanPostProcessor() {
@Override
protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
return new MethodJmsListenerEndpoint() {
@Override
protected MessagingMessageListenerAdapter createMessageListener(
MessageListenerContainer container) {
MessagingMessageListenerAdapter listener = super.createMessageListener(container);
ProxyFactory pf = new ProxyFactory(listener);
pf.setProxyTargetClass(true);
NameMatchMethodPointcutAdvisor advisor = new NameMatchMethodPointcutAdvisor(new MyJmsInterceptor());
advisor.addMethodName("onMessage");
pf.addAdvisor(advisor);
return (MessagingMessageListenerAdapter) pf.getProxy();
}
};
}
};
}
public static class MyJmsInterceptor implements MethodInterceptor {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
Message message = (Message) invocation.getArguments()[0];
logger.info(message.toString());
// validate
return invocation.proceed();
}
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
注意:BPP必须是静态的,并且需要@EnableJms
,因为此BPP的存在会禁用启动。
2018-04-06 13:44:41.607信息82669 --- [EnerContainer-1] .SO496829341Application $ myJMSInterpector:activemqtextMessage {commandId = 5,prosessired = 5,prossigeReed = 5,pression = true = id = id = id = id = id = id = id =:4:2:1:1:1,ointrimentDestation = null,OriginalTranSactionId = null,producerId = id:gollum.local-63685-1523036676402-4:2:2:1:1:1:1,destination = queue://so496829341,transactionid = null,transactionId = null,transactionid = null,null,null,null,null,dullnull,compressed = false,userId = null,content = null,marshalledproperties = null,dataCtructure = null,redeliveryCounter = 0,size = 1050,properties = {typeId = com.example.so496829341application $ foo},readonlyproperties = realllyproperties = true true = true,是的,drowpable = false,jmsxgroupfirstforconsumer = false,text = {&quord; bar&quort" baz;}}}}
2018-04-06 13:44:41.634信息82669 --- [enercontainer-1] ichication $$ EnhancerBysBysBysBysPringCglib $$ 9ff4b13f:foo [bar = baz]
>
edit3
避免AOP ...
@SpringBootApplication
@EnableJms
public class So496829341Application {
private final Logger logger = LoggerFactory.getLogger(getClass());
public static void main(String[] args) {
SpringApplication.run(So496829341Application.class, args);
}
@JmsListener(id = "listen1", destination="so496829341")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Bean
public ApplicationRunner runner(JmsTemplate template) {
return args -> {
Thread.sleep(5_000);
template.convertAndSend("so496829341", new Foo("baz"));
};
}
@Bean
public MessageConverter converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("typeId");
return converter;
}
@Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public static JmsListenerAnnotationBeanPostProcessor bpp() {
return new JmsListenerAnnotationBeanPostProcessor() {
@Override
protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
return new MethodJmsListenerEndpoint() {
@Override
protected MessagingMessageListenerAdapter createMessageListener(
MessageListenerContainer container) {
final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
return new MessagingMessageListenerAdapter() {
@Override
public void onMessage(Message jmsMessage, Session session) throws JMSException {
logger.info(jmsMessage.toString());
// validate
listener.onMessage(jmsMessage, session);
}
};
}
};
}
};
}
public static class Foo {
private String bar;
public Foo() {
super();
}
public Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
edit4
要访问侦听器方法上的其他注释,可以做到,但是需要反射才能获得Method
...
@JmsListener(id = "listen1", destination="so496829341")
@Schema("foo.bar")
public void listen(Foo foo) {
logger.info(foo.toString());
}
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface Schema {
String value();
}
@Bean(JmsListenerConfigUtils.JMS_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
public static JmsListenerAnnotationBeanPostProcessor bpp() {
return new JmsListenerAnnotationBeanPostProcessor() {
@Override
protected MethodJmsListenerEndpoint createMethodJmsListenerEndpoint() {
return new MethodJmsListenerEndpoint() {
@Override
protected MessagingMessageListenerAdapter createMessageListener(
MessageListenerContainer container) {
final MessagingMessageListenerAdapter listener = super.createMessageListener(container);
InvocableHandlerMethod handlerMethod =
(InvocableHandlerMethod) new DirectFieldAccessor(listener)
.getPropertyValue("handlerMethod");
final Schema schema = AnnotationUtils.getAnnotation(handlerMethod.getMethod(), Schema.class);
return new MessagingMessageListenerAdapter() {
@Override
public void onMessage(Message jmsMessage, Session session) throws JMSException {
logger.info(jmsMessage.toString());
logger.info(schema.value());
// validate
listener.onMessage(jmsMessage, session);
}
};
}
};
}
};
}