我正试图将一个大文件写入mule中的file出站端点。我的要求是读取一个大文件,使用java transformer对其进行转换,然后将其写入文件出站端点。我不能使用datamapper,而且我的转换逻辑很复杂。变压器工作正常,但似乎没有流动。请提出建议。下面是我的示例代码:
流量:
<spring:beans>
<spring:bean id="responseTransformer" scope="prototype"
class="my.streaming.StreamTransformer">
</spring:bean>
</spring:beans>
<file:connector name="File" autoDelete="false"
streaming="true" validateConnections="true" doc:name="File" />
<flow name="mystreamflow">
<file:inbound-endpoint path="C:mylocation"
responseTimeout="10000" doc:name="File" connector-ref="File"
pollingFrequency="1000" />
<logger message="OUTPUT:#[payload]" level="INFO" doc:name="Logger" />
<file:outbound-endpoint path="C:output"
outputPattern="output.txt" responseTimeout="10000" doc:name="File"
transformer-refs="responseTransformer" connector-ref="File" />
</flow>
Java转换器:
@Override
public Object transformMessage(MuleMessage message, String outputEncoding)
throws TransformerException {
InputStream is = (InputStream) message.getPayload();
InputStreamReader isr = new InputStreamReader(is);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
BufferedWriter bw = null;
try {
bw = getWriter(bos);
while (isr.read() > 0) {
//transform and write
bw.write("writing something");
bw.flush();
System.out.println("Writing something.....");
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
try {
bos.close();
if (bw != null) {
bw.close();
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
return bos.toByteArray();
}
private BufferedWriter getWriter(ByteArrayOutputStream bos)
throws IOException {
BufferedWriter bw = null;
try {
BufferedOutputStream bo = new BufferedOutputStream(bos);
bw = new BufferedWriter(new OutputStreamWriter(bo, "UTF-8"));
} catch (IOException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
return bw;
}
转换器返回一个字节数组,而不是InputStream
,因此它不是流式转换器。这会占用内存,并导致非常大的文件出现问题。
您需要重写您的transformer,以便它返回一个InputStream
。
编辑-调查任一:
- 管道输入/输出流将转换的流输出连接到
PipedInputStream
,转换器返回Mule。小心线程:您需要在传递给Mule工作管理器的工作项中执行PipedOutputStream
编写器代码 - 创建自己的
InputStream
子类,该子类通过逐步读取入站InputStream
来逐步生成转换后的结果