使用Apache Camel ConsumerTemplate从ActiveMQ Artemis轮询字节/大消息



我在通过JMS连接到ActiveMQ Artemis时,在基于Apache Camel的应用程序中遇到了一个问题。在一条骆驼路由的末尾,消息存储在Artemis JMS队列中。在同一应用程序中运行的遗留组件使用ConsumerTemplate定期从那里拾取它们。

这适用于具有纯文本正文的 Camel 消息,但在使用字节数组正文时会导致错误:似乎 Artemis 将任何带有字节体的消息视为"大消息",这些消息被流式传输而不是保存在内存中。通过ConsumerTemplate接收有效,但一旦访问正文或标头,就会引发如下异常:

org.apache.camel.RuntimeCamelException: Failed to extract body due to: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session. Message: ActiveMQMessage[ID:90c4d1d5-3233-11ea-b0cc-44032c68a56f]:PERSISTENT/ClientLargeMessageImpl[messageID=2974, durable=true, address=mytest,userID=90c4d1d5-3233-11ea-b0cc-44032c68a56f,properties=TypedProperties[firedTime=Wed Jan 08 17:26:03 CET 2020,__AMQ_CID=90b4f34e-3233-11ea-b0cc-44032c68a56f,breadcrumbId=ID-NB045-evolit-co-at-1578500762151-0-1,_AMQ_ROUTING_TYPE=1,_AMQ_LARGE_SIZE=3]]
at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:172) ~[camel-jms-2.22.1.jar:2.22.1]
at org.apache.camel.component.jms.JmsMessage.createBody(JmsMessage.java:221) ~[camel-jms-2.22.1.jar:2.22.1]
at org.apache.camel.impl.MessageSupport.getBody(MessageSupport.java:54) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.example.cdi.JmsPoller.someMethod(JmsPoller.java:36) ~[classes/:?]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_171]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_171]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_171]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_171]
at org.apache.camel.component.bean.MethodInfo.invoke(MethodInfo.java:481) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:300) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:273) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.bean.AbstractBeanProcessor.process(AbstractBeanProcessor.java:188) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:53) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.bean.BeanProducer.process(BeanProducer.java:41) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:148) ~[camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:548) [camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:201) [camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.timer.TimerConsumer.sendTimerExchange(TimerConsumer.java:197) [camel-core-2.22.1.jar:2.22.1]
at org.apache.camel.component.timer.TimerConsumer$1.run(TimerConsumer.java:79) [camel-core-2.22.1.jar:2.22.1]
at java.util.TimerThread.mainLoop(Timer.java:555) [?:1.8.0_171]
at java.util.TimerThread.run(Timer.java:505) [?:1.8.0_171]
Caused by: javax.jms.IllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
... 21 more
Caused by: org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException: AMQ119023: The large message lost connection with its session, either because of a rollback or a closed session
at org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:273) ~[artemis-core-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.saveToOutputStream(ClientLargeMessageImpl.java:115) ~[artemis-core-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.jms.client.ActiveMQMessage.saveToOutputStream(ActiveMQMessage.java:853) ~[artemis-jms-client-2.6.2.jar:2.6.2]
at org.apache.activemq.artemis.jms.client.ActiveMQMessage.setObjectProperty(ActiveMQMessage.java:693) ~[artemis-jms-client-2.6.2.jar:2.6.2]
at org.apache.camel.component.jms.JmsBinding.createByteArrayFromBytesMessage(JmsBinding.java:251) ~[camel-jms-2.22.1.jar:2.22.1]
at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:163) ~[camel-jms-2.22.1.jar:2.22.1]
... 21 more

对于不超过 ArtemisminLargeMessageSize的消息,即使在 3 个字节的测试程序中,也会出现此问题。

巧合的是,用于测试应用程序的独立应用程序中也出现了相同的问题。在那里,我能够通过保持 JMS 会话和接收器打开状态来解决此问题,直到完全读取 JMS 消息正文和标头。对于骆驼,这在骆驼所基于的春季JmsTemplate中被抽象化了。


我查阅了 Camel JMS 组件的用户文档,以找到可能对我有帮助的配置选项。我尝试了以下方法:

  • eagerLoadingOfProperties=true消费者方面:没有效果,似乎只影响MessageListenerContainer。文档说:

它使用 [...]Spring 的 JmsTemplate 用于发送,MessageListenerContainer 用于消费。

但是,在调试时,似乎仅在使用来自 Camel 路由中的 JMS 端点的消息时才使用MessageListenerContainer。像我的情况一样使用ConsumerTemplate使用JmsTemplate进行消费。

  • 消费者端的messageConvertermapJmsMessage:无效,它们在会话已经关闭时执行
  • alwaysCopyMessage在制作人方面:我认为复制可能会阻止使用流式传输的大消息,没有效果
  • 生产者方面streamMessageTypeEnabled=false:无效果
  • 生产者和消费者jmsMessageType=Bytes:无影响
  • transferExchange=true生产者和消费者方面:这似乎确实解决了我的具体情况,但感觉像是一种解决方法。文档建议谨慎使用该选项。

所以现在,transferExchange似乎是我最好的选择,假设它确实解决了我在所有测试用例中的问题。尽管如此,我很高兴对这个问题或不同的解决方案有更好的理解:

  1. 为什么Artemis无论如何都要将小字节数组消息视为大消息?
  2. 骆驼消费者模板是否支持流式传输大型消息?

我的版本是骆驼2.22.1和Artemis 2.10.1。


我已经能够通过修改 Camel 发布包中的 Camel 示例camel-example-cdi来重现我的问题,以具有如下所示的最小类。 此外,我还添加了camel-jms和Artemis依赖项,并在本地启动了Artemis,两者都如camel-example-artemis-large-messages示例中所述。

public class MyRoutes extends RouteBuilder {
@Override
public void configure() {
setupJmsComponent();
from("timer:writeTimer?period=6000")
.log("writing to JMS")
.setBody(() -> new byte[]{0,1,2})
.to(JmsPoller.ENDPOINT);
from("timer:pollTimer?period=3000")
.to("bean:jmsPoller");
}
private void setupJmsComponent() {
ActiveMQJMSConnectionFactory connectionFactory = new ActiveMQJMSConnectionFactory("tcp://localhost:61616");
JmsComponent jmsComponent = new JmsComponent();
jmsComponent.setConnectionFactory(connectionFactory);
getContext().addComponent("jms", jmsComponent);
}
}
@Singleton
@Named("jmsPoller")
public class JmsPoller {
static final String ENDPOINT = "jms:queue:mytest";
@Inject
private ConsumerTemplate consumerTemplate;
public void someMethod(String body) {
Exchange exchange = consumerTemplate.receive(ENDPOINT, 1000L);
System.out.println("Received " + (exchange == null ? null : exchange.getIn().getBody()));
}
}

ActiveMQ Artemis不会将任何带有字节体的消息视为"大"消息。值得注意的是,代理最终将所有消息正文视为字节数组,因为这正是它们。但是,为了被视为"大"消息必须超过特定大小。文档指出:

任何大于特定大小的消息都被视为大消息。大型消息将被拆分并分段发送。这是由 URL 参数minLargeMessageSize确定的。

Apache ActiveMQ Artemis消息使用每个字符2个字节进行编码,因此如果消息数据填充ASCII字符(1字节(,则生成的Apache ActiveMQ Artemis消息的大小大约会翻倍。这在计算"大"消息的大小时很重要,因为它可能看起来小于发送前的minLargeMessageSize,但一旦编码,它就会变成"大"消息。

默认值为 100KiB。

看起来应用程序的用例根本不适合ActiveMQ Artemis中大型消息支持的语义,因为消息来自的会话在消息正文完全接收之前就被关闭了。

因此,我建议您在读取正文之前保持会话打开状态,或者增加发送消息的应用程序的 URL 上的minLargeMessageSize,以便任何消息都不会被视为"大"。后一个选项可能会导致代理上的内存使用量增加,因为整个消息正文将立即保存在内存中。

相关内容

  • 没有找到相关文章

最新更新