我使用的消息工作流由消息驱动的通道适配器=>通道=>出站通道适配器。其目的是将消息从MqSeries代理传输到另一个MqSeries代理。这是事务性的(需要ack)
下面是它的相关部分(有些部分明显缺失。如果你认为它们是必需的,我会编辑我的帖子并添加它们)。
我的问题是关于消息头,特别是msgId。当我将messageId为的消息放入入站队列时,我希望它在整个管道中保持不变
但messageId在出站队列中进行转换,其内容将被生成的ID(包括出站队列管理器名称)替换。
来自发射器(这只是一个可能的发射代码的例子。我使用的每一个代码都有同样的问题,只要我提供一个msgId):
com.ibm.mq.MQMessage message = new MQMessage();
message.messageId=("TEST MessageId 1234").getBytes();
来自MQExplorer:
- 来自入站队列:MessageId=TEST MessageId 1234
- 来自出站队列:MessageId=AMQ<QM_NAME>lt;一些随机(?)码>
可能有一个明显的原因(但对我来说不是),但我现在不明白。我读到(好吗?)QM可以根据特定场景或特定命令生成消息Id。但我看不出它是如何应用于春季集成的。
有人知道Spring Integration如何处理messageId,以及我如何在整个管道中保持相同的消息吗?
<beans>
<int:channel id="channelMQ_MQ" ></int:channel>
<!-- Source : MQseries -->
<!- ... -->
<bean id="jmsQueue" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
</bean>
<!- ... -->
<bean id="myListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
<property name="autoStartup" value="false" />
<property name="connectionFactory" ref="connectionFactoryCaching" />
<property name="destination" ref="jmsQueue" />
<!- ... -->
<property name="sessionTransacted" value="true"/>
</bean>
<int-jms:message-driven-channel-adapter
id="jmsIn"
container="myListener"
channel="channelMQ_MQ"
error-channel="processChannel1"/>
<!-- Destination MQ_SERIES -->
<!- ... -->
<bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
</bean>
<int-jms:outbound-channel-adapter channel="channelMQ_MQ"
id="jmsOut2"
destination="jmsQueue2"
connection-factory="connectionFactoryCaching2"
delivery-persistent="true"
explicit-qos-enabled="true"
session-transacted="true" >
</int-jms:outbound-channel-adapter>
</beans>
编辑1:
按照@artem-bilan的建议,我设置了一个头部富集器。但是atm根本不起作用。。。没有设置任何属性。
<int:channel id="channel_tmp">
</int:channel>
<int:header-enricher input-channel="channelMQ_MQ" output-channel="channel_tmp" id="headerEnricher1">
<int:header name="MSI" expression="headers.jms_messageId"/>
<int:header name="JMS_IBM_MQMD_MsgId" expression="headers.jms_messageId"/>
<int:header name="MSGID" expression="headers.jms_messageId"/>
<int:header name="MsgId" expression="headers.jms_messageId"/>
<int:header name="CorrelId" expression="headers.jms_messageId"/>
<int:header name="GroupId" expression="headers.jms_messageId"/>
<int:header name="MsggSeqNumber" expression="headers.jms_messageId"/>
<int:header name="offset" expression="headers.jms_messageId"/>
</int:header-enricher>
<int-jms:outbound-channel-adapter channel="channel_tmp"
id="jmsOut2"
destination="jmsQueue2"
connection-factory="connectionFactoryCaching2"
delivery-persistent="true"
explicit-qos-enabled="true"
session-transacted="true" >
</int-jms:outbound-channel-adapter>
编辑2:经过一番研究,我们发现一份IBM文档指出;为了能够设置消息ID,JMS目标队列需要具有属性"MQMD WRITE ENABLE";设置为ENABLED。此属性允许JMS应用程序设置MQMD字段的值"所以我们试图从JmsQueue:中设置此属性
<bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>
</bean>
不幸的是,尽管它很有希望,但这对messageId不起作用(但其他MQMD字段起作用)。
第3版:
根据Artem Bilan关于调试JmsHeaderMapper的建议,听起来我们发现字节数组不受头映射器(spring集成版本:5.3.2.REASE)的支持,但IBM期望。。。这导致报头基本上被跳过。因此,这不会以这种方式工作:
<int:header name="JMS_IBM_MQMD_MsgId" expression="headers['jms_messageId'].bytes"/>
编辑4:
在注意到当前版本的spring集成jms不接受";字节[]";类型(即IBMMSGID类型),我们添加了一个自定义标头映射器。它起作用了,但我们必须从已经映射的消息(看起来像"ID:3214F1044…")中检索(十六进制到字节)它,并将它作为字节数组作为";JMS_IBM_MQMD_MsgId";所有物这是一个可疑的解决方案,因为三重转换(MQ[BYTE24]=>JMS[ID:String]=>Java[Byte[]=>
最终,我们发现入站队列和出站队列都可以配置为传递所有上下文(JMS映射的头和原始MQ头)。因此,我们不必进行复杂的映射。。。。只有基本映射(因为字节[]在defaultHeaderMapper中仍然没有映射)。
所以最终的解决方案是:
<bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDMessageContext" value="2"></property>
<property name="MQMDReadEnabled" value="true"></property>
</bean>
<bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>
</bean>
<bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/>
<int-jms:outbound-channel-adapter channel="channel_MQ_MQ"
id="jmsOut"
destination="jmsQueueOUT"
...
header-mapper="mqCompatibleJmsHeaderMapper">
...
</int-jms:outbound-channel-adapter>
_
public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
...
public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
if(messageId !=null) {
if (messageId instanceof byte[]) {
jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
}else {
...
}
}
super.fromHeaders(headers, jmsMessage);
}
...
}
对我来说,这并不明显……你的意思是,这种行为是";正常的";并且我应该使用headerMapper来更改它;jms_;前缀
好吧,该标头映射在DefaultJmsHeaderMapper
中,如下所示:headers.put(JmsHeaders.MESSAGE_ID, messageId);
。因此,它确实是一个jms_messageId
。而且它实际上并没有映射到出站端:
if (StringUtils.hasText(headerName) &&
!headerName.startsWith(JmsHeaders.PREFIX) &&
jmsMessage.getObjectProperty(headerName) == null) {
我认为忽略它们是有原因的,因为并非所有JMS供应商都允许覆盖所有这些org.springframework.jms.support.JmsHeaders
。
对于您的用例,您可以在<int-jms:outbound-channel-adapter>
:之前执行此操作
<header-enricher>
<header name="messageId" expression="headers.jms_messageId"/>
</header-enricher>
这个答案计算了帮助解决我们问题的各种(非常有用!)注释和答案。
在注意到当前版本的spring集成jms不接受";字节[]";类型(即IBMMSGID类型),我们添加了一个自定义标头映射器。
原始的RAW msgId在入站中不可用:相反,我们收到了JMS映射的头例如:
JMSXAppID=com.my.company.test.MqProducer
jms_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1jms_correlationId=ID:4142434423313233343536373839305f310000000000000000jms_messageId=ID:4142434423313233343536373839305f310000000000000000
它们是通过这种方式从与入站适配器相连的listner接收的
因此,初稿使用headerMapper将jmsMessageId(Id:…)转换为IBM Mq BYTE24可编译文件(因此:byte[]转换为">JMS_IBM_MQMD_MsgId"属性)
但这是一个危险的解决方案,因为存在三重转换(MQ[BYTE24]=>JMS[ID:String]=>Java[Byte[]=>
最终,我们发现可以配置入站队列和出站队列,以便它们将传递所有上下文(jms映射的头和原始MQMD头):使用MQMDMessageContext=CMCQ.MQPMO_SET_all_context(2)和MQMDRead/WriteEnabled=true。这样,所有必需的字段从一开始就可用:
JMS_IBM_MQMD_PutApplName=CustomOwnApplName
JMSXAppID=com.my.company.test.MqProducer
JMS_IBM-MQMD_ReplyToQ=
JMS_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1
JMS_IBM_MQMD_CorrelId=[B@47e0d39f
JMS_correlationId=ID:4142434423313233343536373839305f310000000000000000
JMS_IBM_MQMD_MsgId=[B@3999141ee
jns_messageId=ID:414243 442331323344353637383930 5f310000000000
因此,我们不必进行可疑的映射。。。。只有基本映射(因为字节[]在defaultHeaderMapper中仍然没有映射)。
所以最终的解决方案是:
<bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDMessageContext" value="2"></property>
<property name="MQMDReadEnabled" value="true"></property>
</bean>
<bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>
</bean>
<bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/>
<int-jms:outbound-channel-adapter channel="channel_MQ_MQ"
id="jmsOut"
destination="jmsQueueOUT"
...
header-mapper="mqCompatibleJmsHeaderMapper">
...
</int-jms:outbound-channel-adapter>
_
public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
...
public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
if(messageId !=null) {
if (messageId instanceof byte[]) {
jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
}else {
...
}
}
super.fromHeaders(headers, jmsMessage);
}
...
}