我有一个包含 500k 条记录的输入文件。我需要批量处理这些记录,应用转换并写入输出文件。我正在尝试使用以程进行一些实验。批处理块大小设置为 1000。输出文件仅包含 1000 条记录。其余的 490k 记录将丢失。
根据我的理解,批处理为每个块大小启动一个新实例,在这种情况下,每 1000 条记录将由新线程处理。这些线程是否相互覆盖?如何将所有转换后的记录收集到输出文件中?
<flow name="poll-inbound-file">
<file:inbound-endpoint path="${file.inbound.location}"
pollingFrequency="${file.polling.frequency}" responseTimeout="10000"
doc:name="File" metadata:id="abce53af-7d82-411a-a75a-5cd8ae8e55ae"
fileAge="${file.fileage}" moveToDirectory="${file.outbound.location}"/>
<custom-interceptor
class="com.example.TimerInterceptor" doc:name="Timer" />
<dw:transform-message doc:name="Transform Message"
metadata:id="dcf84872-5aca-404f-9169-d448c9e4cd76">
<dw:input-payload mimeType="application/csv" />
<dw:set-payload><![CDATA[%dw 1.0
%output application/java
---
payload as :iterator]]></dw:set-payload>
</dw:transform-message>
<batch:job name="process-batchBatch" block-size="${batch.blocksize}">
<batch:process-records>
<batch:step name="Batch_Step1">
<logger level="TRACE" doc:name="Logger" message="#[payload]" />
</batch:step>
<batch:step name="Batch_Step2">
<logger level="TRACE" doc:name="Logger" message="#[payload]" />
</batch:step>
<batch:step name="Batch_Step3">
<batch:commit doc:name="Batch Commit" size="1000">
<expression-component doc:name="Expression"><![CDATA[StringBuilder sb=new StringBuilder();
for(String s: payload)
{
sb.append(s);
sb.append(System.lineSeparator());
}
payload= sb.toString();]]></expression-component>
<file:outbound-endpoint path="${file.outbound.location}"
responseTimeout="10000" doc:name="File" />
</batch:commit>
</batch:step>
</batch:process-records>
<batch:on-complete>
<logger
message="******************************************** Batch Report **************************************"
level="INFO" doc:name="Logger" />
</batch:on-complete>
</batch:job>
</flow>
同时从多个线程写入文件通常不安全。相反,将结果写入队列(如 ActiveMQ 等(,并具有另一个流,该流从队列中读取,然后写入文件。您可以决定是否要在处理文件之前或之后从队列开始处理。