异步处理交换,而原始有效负载保持不变



我们需要处理一个记录数超过10万的文件。每个记录都应该生成为XML并保存到DB中。我们能够实现可以处理50多万条记录的系统,现在我们有了一个新的要求,即将每条记录转换为另一种形式的XML,并保存在另一个表中以供审计。

我采用了以下方法来实现它。最初,每个记录都是从平面文件中读取的,转换为域契约,然后并行保存到审计表,并转换/丰富为另一种格式并保存到域表。这是我正在使用的示例路线。

        <route>
           <from uri="direct-vm:domainInXML" />
           <setHeader headerName="auditID"><groovy>UUID.randomUUID().toString()</groovy>  </setHeader>
           <!-- aysn transform and save domain XML to audit DB -->
           <inOnly uri="vm:auditInXMLTransformAndDBPersistor"/>
           <to uri="activemq:queue:domianInQueue?disableReplyTo=true"/>
        </route>
        <route>
             <from uri="activemq:queue:domianInQueue" />
             <!-- transform and enrich headers -->
             <to uri="xslt:xslt/convertToInternalDomainContract.xsl />
             <to uri="direct-vm:transformAndSaveTODomainDB"/>
        </route>
         <route>
             <from uri="vm:auditInXMLTransformAndDBPersistor?concurrentConsumers=3" />
             <!-- transform and enrich headers -->
             <to uri="xslt:xslt/convertToAuditDomainContract.xsl />
             <to uri="direct-vm:transformAndSaveTOAuditDB"/>
        </route>

这里的问题是,当我们正在处理数千条记录时,审计XML的转换和持久性是否在另一个线程中并行运行,而同一域XML是否被转换为另一种格式并保存到域DB?

会有延误吗?你能建议什么更好的方法吗?当我们将审核XML保存到DB时,最初我们将状态设置为"CREATE",在内部域的转换、验证和持久化失败时,我们需要使用头中的auditID将审核表中的状态更新为ERROR。

在处理过程中,当任何记录由于某些错误而无法处理时,我尝试在标头中使用auditID将状态更新为error,但到那时,可能会有未对auditDB进行审核XML的情况。如何解决这个问题?感谢您的帮助。

对于涉及审计窃听的用例,eip派上了用场。您可以参考http://camel.apache.org/wire-tap.根据Camel的定义,Wire Tap(来自EIP模式)允许您在将消息转发到最终目的地时将消息路由到单独的位置。

基本上,窃听创建了一个不同的线程,并使用原始交换的副本执行所需的功能。Inc失败的情况下,您可以使用相同的窃听路由来执行任何更新。为了确认它是按顺序处理的,您可以将并发消费者设置为1,这样您就不会陷入竞争状态。

相关内容

最新更新