试图在ActiveMQ集群之间压缩jms ObjectMessages,序列化异常.如何强制执行ByteMessages



你好

我正在开发一个大型ActiveMQ消息中心。许多应用程序通过我们的集线器将消息发送到各种ActiveMQ集群,我们的工作是路由这些消息(就像邮政服务一样)。我们希望利用Camel将消息从一个ActiveMQ集群路由到另一个集群。我们希望Camel做的部分工作是压缩集群之间的消息。

然而,大多数消息都是以ObjectMessages的形式传递的,我们无法访问jar文件(更确切地说:既不关心对象文件,也不关心数据,我们只想压缩数据并将其移动到另一个代理)。就像邮政服务一样,我们不感兴趣,也不被允许阅读这些信息,只是传递它们。

当我尝试从一个ActiveMQ代理读取、压缩消息并将其发送到另一个代理时,我会得到一个ClassCastException,表明我的可序列化类在类路径中不存在。

我不希望Camel/ActiveMQ将对象序列化回Java对象,相反,我希望它只是将消息读取为字节并压缩这些字节Camel是否可能从ActiveMQ/JMS中读取并认为它只是一个字节/流/gobligook的blob

我得到的最接近的方法是指定jmsMessageType=Bytes,如下所示,但它不起作用。我试着在上找到答案http://camel.apache.org/jms但我找不到解决办法。

我试着把一条短信排队,它运行得很好。但是,这必须适用于每种JMS消息类型,包括ObjectMessages。

我使用的是Redhat A-MQ 6.3、Camel 2.17.0,位于Karaf 2.4.0、ActiveMQ 5.11.0中。

如果你能通过提供XML DSL示例版本/代码片段而不是Java DSL来帮助我(我对Camel很陌生,还不习惯在这些版本之间进行翻译),你会得到"额外的分数"(或者至少我会很高兴),但任何帮助都是张开双臂接受的!

非常感谢您提供的任何想法或解决方案!

我的骆驼文件是

<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring
http://camel.apache.org/schema/spring/camel-spring.xsd">
<bean id="jmsone" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://localhost" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
</bean>
</property>
</bean>
<bean id="jmstwo" class="org.apache.camel.component.jms.JmsComponent">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="tcp://numbertwo:61616" />
<property name="userName" value="admin" />
<property name="password" value="admin" />
</bean>
</property>
</bean>
<camelContext xmlns="http://camel.apache.org/schema/spring">
<dataFormats>
<gzip id="compressor"/>
</dataFormats>
<route id="queueToOtherBroker">
<from uri="jmsone:andersPanders?jmsMessageType=Bytes" />
<marshal ref="compressor" />
<to uri="jmstwo:olleBandola" />
</route>
</camelContext>
</beans>

一号和二号地址是Docker容器,连接不是问题,它们可以相互通信。

以下是完整的例外

2017-01-27 14:54:43,003 | WARN  | r[andersPanders] | EndpointMessageListener          | rg.apache.camel.util.CamelLogger  213 | 192 - org.apache.camel.camel-core - 2.17.0.redhat-630187 | Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - Failed to extract body due to: javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Anders not found by org.apache.activemq.activemq-osgi [162]. Message: ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:VeryGoodTea-42317-1485528882333-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:VeryGoodTea-42317-1485528882333-1:1:1:1, destination = queue://andersPanders, transactionId = null, expiration = 0, timestamp = 1485528882925, arrival = 0, brokerInTime = 1485528882928, brokerOutTime = 1485528882944, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@57958b, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1109, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}]
org.apache.camel.RuntimeCamelException: Failed to extract body due to: javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Anders not found by org.apache.activemq.activemq-osgi [162]. Message: ActiveMQObjectMessage {commandId = 5, responseRequired = true, messageId = ID:VeryGoodTea-42317-1485528882333-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:VeryGoodTea-42317-1485528882333-1:1:1:1, destination = queue://andersPanders, transactionId = null, expiration = 0, timestamp = 1485528882925, arrival = 0, brokerInTime = 1485528882928, brokerOutTime = 1485528882944, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@57958b, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1109, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:160)[206:org.apache.camel.camel-jms:2.17.0.redhat-630187]
at org.apache.camel.component.jms.JmsMessage.createBody(JmsMessage.java:236)[206:org.apache.camel.camel-jms:2.17.0.redhat-630187]
at org.apache.camel.impl.MessageSupport.getBody(MessageSupport.java:47)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.impl.DefaultUnitOfWork.<init>(DefaultUnitOfWork.java:90)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.impl.DefaultUnitOfWork.<init>(DefaultUnitOfWork.java:72)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.impl.DefaultUnitOfWorkFactory.createUnitOfWork(DefaultUnitOfWorkFactory.java:34)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.createUnitOfWork(CamelInternalProcessor.java:683)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:651)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.processor.CamelInternalProcessor$UnitOfWorkProcessorAdvice.before(CamelInternalProcessor.java:628)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:144)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:109)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:91)[192:org.apache.camel.camel-core:2.17.0.redhat-630187]
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:112)[206:org.apache.camel.camel-jms:2.17.0.redhat-630187]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:555)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:515)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:485)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:325)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:263)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1103)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1095)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:992)[205:org.apache.servicemix.bundles.spring-jms:3.2.16.RELEASE_2]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]
at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]
Caused by: javax.jms.JMSException: Failed to build body from content. Serializable class not available to broker. Reason: java.lang.ClassNotFoundException: Anders not found by org.apache.activemq.activemq-osgi [162]
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:36)[162:org.apache.activemq.activemq-osgi:5.11.0.redhat-630187]
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:208)[162:org.apache.activemq.activemq-osgi:5.11.0.redhat-630187]
at org.apache.camel.component.jms.JmsBinding.extractBodyFromJms(JmsBinding.java:135)[206:org.apache.camel.camel-jms:2.17.0.redhat-630187]
... 23 more
Caused by: java.lang.ClassNotFoundException: Anders not found by org.apache.activemq.activemq-osgi [162]
at org.apache.felix.framework.BundleWiringImpl.findClassOrResourceByDelegation(BundleWiringImpl.java:1556)[org.apache.felix.framework-4.4.1.jar:]
at org.apache.felix.framework.BundleWiringImpl.access$400(BundleWiringImpl.java:77)[org.apache.felix.framework-4.4.1.jar:]
at org.apache.felix.framework.BundleWiringImpl$BundleClassLoader.loadClass(BundleWiringImpl.java:1993)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)[:1.8.0_121]
at java.lang.Class.forName0(Native Method)[:1.8.0_121]
at java.lang.Class.forName(Class.java:348)[:1.8.0_121]
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.load(ClassLoadingAwareObjectInputStream.java:140)[162:org.apache.activemq.activemq-osgi:5.11.0.redhat-630187]
at org.apache.activemq.util.ClassLoadingAwareObjectInputStream.resolveClass(ClassLoadingAwareObjectInputStream.java:55)[162:org.apache.activemq.activemq-osgi:5.11.0.redhat-630187]
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1819)[:1.8.0_121]
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)[:1.8.0_121]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1986)[:1.8.0_121]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)[:1.8.0_121]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)[:1.8.0_121]
at org.apache.activemq.command.ActiveMQObjectMessage.getObject(ActiveMQObjectMessage.java:206)[162:org.apache.activemq.activemq-osgi:5.11.0.redhat-630187]
... 24 more

安德斯只是我为测试压缩而上的一个班。

我在groovy中的测试程序:

import org.apache.activemq.ActiveMQConnectionFactory
import javax.jms.*
class Anders implements Serializable {
String name = "Anders Andersson"
int age=120
}
def brokerUrl = 'tcp://localhost:61616'
def queue = 'andersPanders'
def reader = new BufferedReader(new InputStreamReader(System.in))
new ActiveMQConnectionFactory(brokerURL: brokerUrl, userName: "admin", password: "admin").createConnection().with {
start()
createSession(false, Session.AUTO_ACKNOWLEDGE).with {
def message = createObjectMessage(new Anders())
createProducer().send(createQueue(queue), message)
}
close()
}

免责声明:ObjectMessage是JMS中的一种反模式,因为它引入了紧密耦合,并带来了安全问题。

也就是说,一个只适用于ActiveMQ而不适用于JMS的通用解决方案:

<route id="routeme">
<from uri="activemq:inputqueue?mapJmsMessage=false" />
<setBody>
<simple>${body.getContent().getData()}</simple>
</setBody>
<marshal ref="compressor"/>
<to uri="activemq:outputqueue" />
</route>

mapJmsMessage=false使Camel避免读取实际的Object并将主体保留为ActiveMQObjectMessage。使用该对象,您实际上可以获得支持ObjectMessage的byte[]。上述路由的输出将是一个BytesMessage

最新更新