Mule 4正在修改Java对象变量



我正在将Mule应用程序从3.9.0迁移到Mule 4.4.0。我使用Java对象在传输中传播元数据和其他对象。以下是该类的一个片段:

package com.test;
import java.io.Serializable;
import java.util.HashMap;
import java.util.Map;
public class DataWrapper implements Serializable {
private static final long serialVersionUID = 1L;

private final Map<String, Object> properties = new HashMap<>();
private Object payload;
private final Object originalPayload;

public DataWrapper(final Object payload) {
this(payload, false);
}

public DataWrapper(final Object payload, boolean playback) {
this.originalPayload = payload;
this.payload = payload;
}

@SuppressWarnings("unchecked")
public <T> T getProperty(String name, T defaultValue) {
if(properties.containsKey(name)) {
return (T) properties.get(name);
} else {
return defaultValue;
}
}

public Object getProperty(String name) {
return getProperty(name, null);
}

public void setProperty(String name, Object value) {
properties.put(name, value);
}

public void setProperties(Map<String, Object> additional) {
properties.putAll(additional);
}
public Object getPayload() {
return payload;
}
public void setPayload(Object payload) {
this.payload = payload;
}

public Object getOriginalPayload() {
return originalPayload;
}

public static DataWrapper wrapData(Object payload) {
return new DataWrapper(payload);
}

public static DataWrapper wrapData(Object payload, Map<String, Object> properties) {
DataWrapper wrapper = new DataWrapper(payload);
wrapper.setProperties(properties);
return wrapper;
}
}

被调用的静态方法(例如简化(:

public class MyTransformers {

public static DataWrapper addProperties(DataWrapper wrapper, Map<String, Object> properties) {
//process payload and add properties/objects
wrapper.setProperties(properties);
return wrapper;
}

public static String doSomethingWithPayload(DataWrapper wrapper, Map<String, Object> properties) {
//process payload and add properties/objects
return (String) wrapper.getPayload();
}

public static String doSomethingElse(DataWrapper wrapper) {
//do something else with payload
return (String) wrapper.getPayload();
}
}

以及流程:

<?xml version="1.0" encoding="UTF-8"?>
<mule 
xmlns:sockets="http://www.mulesoft.org/schema/mule/sockets"
xmlns:vm="http://www.mulesoft.org/schema/mule/vm" 
xmlns:java="http://www.mulesoft.org/schema/mule/java" 
xmlns="http://www.mulesoft.org/schema/mule/core" 
xmlns:doc="http://www.mulesoft.org/schema/mule/documentation" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://www.mulesoft.org/schema/mule/core http://www.mulesoft.org/schema/mule/core/current/mule.xsd
http://www.mulesoft.org/schema/mule/vm http://www.mulesoft.org/schema/mule/vm/current/mule-vm.xsd
http://www.mulesoft.org/schema/mule/java http://www.mulesoft.org/schema/mule/java/current/mule-java.xsd
http://www.mulesoft.org/schema/mule/sockets http://www.mulesoft.org/schema/mule/sockets/current/mule-sockets.xsd">

<vm:config name="DefaultVmConnector" doc:name="VM Config" doc:id="786119a5-8fd8-40b8-a145-bfbf5df6a505" >
<vm:queues >
<vm:queue queueName="ReportsEndpoint" />
<vm:queue queueName="PublishEndpoint" />
</vm:queues>
</vm:config>
<sockets:listener-config name="UDP_Config" doc:name="Sockets Listener config" doc:id="b2ec3926-6a0b-4612-97e8-fe9f4011fbad" >
<sockets:udp-listener-connection host="192.168.1.203" port="12121" />
</sockets:listener-config>
<flow name="TestPublishFlow" doc:id="05a95422-080b-491e-9a4d-efc82f10b399" >
<sockets:listener doc:name="Listener" doc:id="5be9224a-be1b-4779-a137-121376cd890d" config-ref="UDP_Config" outputMimeType="text/json"/>
<java:invoke-static doc:name="Data Wrapper" doc:id="9fa629cc-0520-4c00-beae-15f849568d3b" class="com.test.DataWrapper" method="wrapData(java.lang.Object)" outputMimeType="application/java">
<java:args ><![CDATA[#[{payload : payload}]]]></java:args>
</java:invoke-static>
<vm:publish doc:name="Publish to VM" doc:id="b6497fb1-3a57-468c-bcb6-089372407787" config-ref="DefaultVmConnector" sendCorrelationId="NEVER" queueName="PublishEndpoint"/>
</flow>
<flow name="publisher-flow" doc:id="4c765b98-68df-4af2-a517-73bddfe8cc25" >
<vm:listener doc:name="Publish Endpoint" doc:id="a8589268-f5ea-4ebe-a11f-760ff32c2e1f" config-ref="DefaultVmConnector" numberOfConsumers="2" queueName="PublishEndpoint"/>
<java:invoke-static doc:name="Some Business Logic" doc:id="ee53fe99-f459-4581-892e-00e09a8a10e2" class="com.test.MyTransformers" outputMimeType="application/java" method="addProperties(com.test.DataWrapper,java.util.Map)">
<java:args ><![CDATA[#[{ wrapper:payload, properties:{ ClientId:p('source.clientId'), Source:p('data.source'), Metadata:p('data.metadata'), MimeType:p('data.mimetype')}}]]]></java:args>
</java:invoke-static>
<choice doc:name="Choice" doc:id="7976bc82-67d4-4534-b47b-80ed7e287f07" >
<when expression="#[Java::isInstanceOf(payload.payload, 'com.test.DataWrapper')]">
<java:invoke-static doc:name="Do Something" doc:id="528cdafa-9be1-4433-aaf5-0e0352078757" outputMimeType="application/java" class="com.test.MyTransformers" method="doSomethingWithPayload(com.test.DataWrapper,java.util.Map)">
<java:args ><![CDATA[#[{ wrapper : payload, prettyprint : p('xml.prettyprint'), validate : p('xml.validate') }]]]></java:args>
</java:invoke-static>
</when>
<otherwise >
<java:invoke-static doc:name="Do Something Else" doc:id="6d63d9e7-12be-4d43-9990-02b897ec0cee" class="com.test.MyTransformers" method="doSomethingElse(com.test.DataWrapper)" outputMimeType="application/java">
<java:args ><![CDATA[#[{ wrapper : payload }]]]></java:args>
</java:invoke-static>
</otherwise>
</choice>
<logger level="INFO" doc:name="Placeholder" doc:id="800d22cd-55a5-4ee9-a280-497d2f276a63" message="#[payload]"/>
</flow>
</mule>

我需要保留原始消息和处理后的消息以供以后使用。

当payload和originalPayload设置为DataWrapper中的String对象,并且mule消息通过VM传输时,这些对象将变为org.mule.runtime.core.internal.streaming.bytes.ManagedCursorStreamProvider.

在发送到VM发布者之前,DataWrapper中的字段payload和originalpayload是:

{ "count" : 0, "comment" : "This is a test"}

之后是:

org.mule.runtime.core.internal.streaming.bytes.ManagedCursorStreamProvider@162b291f

在通过VM端点时,我还得到了一个序列化异常:

Caused by: java.io.NotSerializableException: org.mule.runtime.core.internal.streaming.bytes.ManagedCursorStreamProvider

在不将有效载荷和originalPayload的类型从Object更改为String的情况下,我如何防止这种情况发生?

Mule 4在大多数组件中实现流式传输,而Mule 3只为某些组件实现了流式传输。HTTP侦听器返回的有效负载是由ManagedCursorStreamProvider实现的流。它不是由VM连接器添加的,而是原始负载。Mule 4中的大多数组件现在都如何透明地使用流实现,因此用户通常不需要关心他们的JSON负载是在流中还是在Java字符串中。例外的是,当您编写实际的Java代码时,您需要了解实际的类。

将特定于Mule的API和类添加到您的自定义Java代码中以使它们具有"流意识",这不是一种好的做法,也不鼓励这样做。相反,一个更简单的解决方案是让DataWeave实现其转换魔法,并自动转换为Java对象,在本例中为String。

示例:

<sockets:listener doc:name="Listener" config-ref="UDP_Config" outputMimeType="text/json"/> 
<ee:transform doc:name="Transform Message">
<ee:message >
<ee:set-payload ><![CDATA[%dw 2.0
output application/java
---
write(payload,"application/java")
]]>
</ee:set-payload>
</ee:message>
</ee:transform>
<java:invoke-static doc:name="Data Wrapper" class="com.test.DataWrapper" method="wrapData(java.lang.Object)" outputMimeType="application/java">
<java:args ><![CDATA[#[{payload : payload}]]]></java:args>
</java:invoke-static>

write((函数从负载中写入一个字符串,可以在包装类中使用。这适用于此特定场景。如果对输入和Java代码进行其他排列,则需要更改类。

最新更新