使用spring集成在2个MQ代理之间保留消息id



我使用的消息工作流由消息驱动的通道适配器=>通道=>出站通道适配器。其目的是将消息从MqSeries代理传输到另一个MqSeries代理。这是事务性的(需要ack)

下面是它的相关部分(有些部分明显缺失。如果你认为它们是必需的,我会编辑我的帖子并添加它们)。

我的问题是关于消息头,特别是msgId。当我将messageId为的消息放入入站队列时,我希望它在整个管道中保持不变
但messageId在出站队列中进行转换,其内容将被生成的ID(包括出站队列管理器名称)替换。

来自发射器(这只是一个可能的发射代码的例子。我使用的每一个代码都有同样的问题,只要我提供一个msgId):

com.ibm.mq.MQMessage message = new MQMessage(); 
message.messageId=("TEST MessageId 1234").getBytes();

来自MQExplorer:

  • 来自入站队列:MessageId=TEST MessageId 1234
  • 来自出站队列:MessageId=AMQ<QM_NAME>lt;一些随机(?)码>

可能有一个明显的原因(但对我来说不是),但我现在不明白。我读到(好吗?)QM可以根据特定场景或特定命令生成消息Id。但我看不出它是如何应用于春季集成的。

有人知道Spring Integration如何处理messageId,以及我如何在整个管道中保持相同的消息吗?

<beans>

<int:channel id="channelMQ_MQ" ></int:channel>
<!-- Source : MQseries -->
<!- ... -->
<bean id="jmsQueue" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...       
</bean>
<!- ... -->
<bean id="myListener" class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
<property name="autoStartup" value="false" />
<property name="connectionFactory" ref="connectionFactoryCaching" />
<property name="destination" ref="jmsQueue" />
<!- ... -->         
<property name="sessionTransacted" value="true"/>
</bean>

<int-jms:message-driven-channel-adapter 
id="jmsIn" 
container="myListener" 
channel="channelMQ_MQ" 
error-channel="processChannel1"/>

<!-- Destination MQ_SERIES      -->
<!- ... -->
<bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
</bean>

<int-jms:outbound-channel-adapter   channel="channelMQ_MQ" 
id="jmsOut2" 
destination="jmsQueue2" 
connection-factory="connectionFactoryCaching2" 
delivery-persistent="true" 
explicit-qos-enabled="true" 
session-transacted="true" >
</int-jms:outbound-channel-adapter>

</beans>

编辑1:

按照@artem-bilan的建议,我设置了一个头部富集器。但是atm根本不起作用。。。没有设置任何属性。

<int:channel id="channel_tmp">
</int:channel>


<int:header-enricher input-channel="channelMQ_MQ"  output-channel="channel_tmp"  id="headerEnricher1">
<int:header name="MSI" expression="headers.jms_messageId"/>
<int:header name="JMS_IBM_MQMD_MsgId" expression="headers.jms_messageId"/>
<int:header name="MSGID" expression="headers.jms_messageId"/>
<int:header name="MsgId" expression="headers.jms_messageId"/>
<int:header name="CorrelId" expression="headers.jms_messageId"/>
<int:header name="GroupId" expression="headers.jms_messageId"/>
<int:header name="MsggSeqNumber" expression="headers.jms_messageId"/>
<int:header name="offset" expression="headers.jms_messageId"/>
</int:header-enricher>

<int-jms:outbound-channel-adapter   channel="channel_tmp" 
id="jmsOut2" 
destination="jmsQueue2" 
connection-factory="connectionFactoryCaching2" 
delivery-persistent="true" 
explicit-qos-enabled="true" 
session-transacted="true" >
</int-jms:outbound-channel-adapter>

编辑2:经过一番研究,我们发现一份IBM文档指出;为了能够设置消息ID,JMS目标队列需要具有属性"MQMD WRITE ENABLE";设置为ENABLED。此属性允许JMS应用程序设置MQMD字段的值"所以我们试图从JmsQueue:中设置此属性

<bean id="jmsQueue2" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>
</bean>

不幸的是,尽管它很有希望,但这对messageId不起作用(但其他MQMD字段起作用)。

第3版:

根据Artem Bilan关于调试JmsHeaderMapper的建议,听起来我们发现字节数组不受头映射器(spring集成版本:5.3.2.REASE)的支持,但IBM期望。。。这导致报头基本上被跳过。因此,这不会以这种方式工作:

<int:header name="JMS_IBM_MQMD_MsgId" expression="headers['jms_messageId'].bytes"/>

编辑4:

在注意到当前版本的spring集成jms不接受";字节[]";类型(即IBMMSGID类型),我们添加了一个自定义标头映射器。它起作用了,但我们必须从已经映射的消息(看起来像"ID:3214F1044…")中检索(十六进制到字节)它,并将它作为字节数组作为";JMS_IBM_MQMD_MsgId";所有物这是一个可疑的解决方案,因为三重转换(MQ[BYTE24]=>JMS[ID:String]=>Java[Byte[]=>
最终,我们发现入站队列和出站队列都可以配置为传递所有上下文(JMS映射的头和原始MQ头)。因此,我们不必进行复杂的映射。。。。只有基本映射(因为字节[]在defaultHeaderMapper中仍然没有映射)。

所以最终的解决方案是:

<bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDMessageContext" value="2"></property>
<property name="MQMDReadEnabled" value="true"></property>
</bean>
<bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>       
</bean>

<bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/> 
<int-jms:outbound-channel-adapter   channel="channel_MQ_MQ" 
id="jmsOut" 
destination="jmsQueueOUT" 
... 
header-mapper="mqCompatibleJmsHeaderMapper">
...                                     
</int-jms:outbound-channel-adapter>

_

public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
...
public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
if(messageId !=null) {
if (messageId instanceof byte[]) {
jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
}else  {
...
}
}
super.fromHeaders(headers, jmsMessage);
}
...
}

对我来说,这并不明显……你的意思是,这种行为是";正常的";并且我应该使用headerMapper来更改它;jms_;前缀

好吧,该标头映射在DefaultJmsHeaderMapper中,如下所示:headers.put(JmsHeaders.MESSAGE_ID, messageId);。因此,它确实是一个jms_messageId。而且它实际上并没有映射到出站端:

if (StringUtils.hasText(headerName) &&
!headerName.startsWith(JmsHeaders.PREFIX) &&
jmsMessage.getObjectProperty(headerName) == null) {

我认为忽略它们是有原因的,因为并非所有JMS供应商都允许覆盖所有这些org.springframework.jms.support.JmsHeaders

对于您的用例,您可以在<int-jms:outbound-channel-adapter>:之前执行此操作

<header-enricher>
<header name="messageId" expression="headers.jms_messageId"/>
</header-enricher>

这个答案计算了帮助解决我们问题的各种(非常有用!)注释和答案。

在注意到当前版本的spring集成jms不接受";字节[]";类型(即IBMMSGID类型),我们添加了一个自定义标头映射器。

原始的RAW msgId在入站中不可用:相反,我们收到了JMS映射的头例如:

JMSXAppID=com.my.company.test.MqProducer
jms_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1jms_correlationId=ID:4142434423313233343536373839305f310000000000000000jms_messageId=ID:4142434423313233343536373839305f310000000000000000

它们是通过这种方式从与入站适配器相连的listner接收的
因此,初稿使用headerMapper将jmsMessageId(Id:…)转换为IBM Mq BYTE24可编译文件(因此:byte[]转换为">JMS_IBM_MQMD_MsgId"属性)

但这是一个危险的解决方案,因为存在三重转换(MQ[BYTE24]=>JMS[ID:String]=>Java[Byte[]=>

最终,我们发现可以配置入站队列和出站队列,以便它们将传递所有上下文(jms映射的头和原始MQMD头):使用MQMDMessageContext=CMCQ.MQPMO_SET_all_context(2)和MQMDRead/WriteEnabled=true。这样,所有必需的字段从一开始就可用:

JMS_IBM_MQMD_PutApplName=CustomOwnApplName
JMSXAppID=com.my.company.test.MqProducer
JMS_IBM-MQMD_ReplyToQ=
JMS_replyTo=queue://QM2/QUEUE.OUT.MOBA?targetClient=1
JMS_IBM_MQMD_CorrelId=[B@47e0d39f
JMS_correlationId=ID:4142434423313233343536373839305f310000000000000000
JMS_IBM_MQMD_MsgId=[B@3999141ee
jns_messageId=ID:414243 442331323344353637383930 5f310000000000

因此,我们不必进行可疑的映射。。。。只有基本映射(因为字节[]在defaultHeaderMapper中仍然没有映射)。

所以最终的解决方案是:

<bean id="jmsQueueIN" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDMessageContext" value="2"></property>
<property name="MQMDReadEnabled" value="true"></property>
</bean>
<bean id="jmsQueueOut" class="com.ibm.mq.jms.MQQueue" depends-on="jmsConnectionFactory">
...
<property name="MQMDWriteEnabled" value="true"></property>
<property name="MQMDMessageContext" value="2"></property>       
</bean>

<bean id="mqCompatibleJmsHeaderMapper" class="com.my.company.mappers.MqCompatibleJmsHeaderMapper"/> 
<int-jms:outbound-channel-adapter   channel="channel_MQ_MQ" 
id="jmsOut" 
destination="jmsQueueOUT" 
... 
header-mapper="mqCompatibleJmsHeaderMapper">
...                                     
</int-jms:outbound-channel-adapter>

_

public class MqCompatibleJmsHeaderMapper extends DefaultJmsHeaderMapper {
...
public void fromHeaders(MessageHeaders headers, Message jmsMessage) {
Object messageId = headers.get(WMQConstants.JMS_IBM_MQMD_MSGID);
if(messageId !=null) {
if (messageId instanceof byte[]) {
jmsMessage.setObjectProperty(WMQConstants.JMS_IBM_MQMD_MSGID, messageId);
}else  {
...
}
}
super.fromHeaders(headers, jmsMessage);
}
...
}

相关内容

  • 没有找到相关文章

最新更新