我在4个不同的服务器上运行4个基于Spring Boot Integration的应用程序实例。过程是:
- 在共享文件夹中逐个读取XML文件
- 处理文件(检查结构、内容…(、转换数据并发送电子邮件
- 在另一个共享文件夹中编写有关此文件的报告
- 删除已成功处理的文件
我正在寻找一个非阻塞和安全的解决方案来处理这些文件。
用例:
- 如果一个实例在读取或处理文件时崩溃(因此不会结束集成链(:另一个实例必须处理该文件,或者同一个实例重新启动后必须处理该
- 如果一个实例正在处理文件,则其他实例不得处理该文件
我已经构建了这个Spring Integration XML配置文件(它包括带有共享H2数据库的JDBC元数据存储(:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
<int:queue/>
</int:channel>
<!-- Input -->
<int-file:inbound-channel-adapter
id="inputFilesAdapter"
channel="inputFilesChannel"
directory="file:${input.files.path}"
ignore-hidden="true"
comparator="lastModifiedFileComparator"
filter="compositeFilter">
<int:poller fixed-rate="10000" max-messages-per-poll="1" task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="1"/>
<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="${database.username}"/>
<property name="password" value="${database.password}"/>
<property name="maxIdle" value="4"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="jdbcMetadataStore"/>
<constructor-arg index="1" value="files"/>
</bean>
</list>
</constructor-arg>
</bean>
<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:outbound-channel-adapter
id="outputFilesChannel"
directory="file:${output.files.path}"
filename-generator-expression ="payload.name">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
</beans>
问题:对于多个文件,当成功处理一个文件时,事务会提交元数据存储中的其他现有文件(表INT_METADATA_STORE
(。因此,如果重新启动应用程序,其他文件将永远不会被处理(如果应用程序在处理第一个文件时崩溃,它会正常工作(。它似乎只适用于读取文件,而不适用于处理集成链中的文件。。。如何逐文件管理JVM崩溃时的回滚事务?
非常感谢您的帮助。这会让我发疯的:(
谢谢!
编辑/注释:
灵感来自https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml
我已经用Artem Bilan的回答更新了我的配置。并删除
poller
块中的transactional
块:我在实例之间发生了事务冲突(丑陋的表锁异常(。尽管行为是一样的我在
poller
块中测试此配置失败(行为相同(:<int:advice-chain> <tx:advice id="txAdvice" transaction-manager="transactionManager"> <tx:attributes> <tx:method name="file*" timeout="30000" propagation="REQUIRED"/> </tx:attributes> </tx:advice> </int:advice-chain>
也许基于Idempontent Receiver Enterprise Integration Pattern的解决方案可以工作。但我没能配置它……我找不到确切的文档
您不应该使用PseudoTransactionManager
,而应该使用DataSourceTransactionManager
。
由于使用JdbcMetadataStore
,它将参与事务,如果下游流失败,元数据存储中的条目也将回滚。
好的。我找到了一个可行的解决方案。也许不是最干净的,但它有效:
- 多个实例位于不同的服务器上,共享同一个H2数据库(网络文件夹装载(。我认为它应该通过远程TCP工作。MVCC已在H2上激活(请查看其文档(
inbound-channel-adapter
激活了scan-each-poll
选项,以允许重新填充以前可以忽略的文件(如果进程已经由另一个实例开始(。因此,如果另一个实例崩溃,则可以在不重新启动该实例的情况下再次轮询和处理该文件- 在DB上将选项
defaultAutoCommit
设置为false
- 我没有使用
FileSystemPersistentAcceptOnceFileListFilter
,因为当一个文件被成功处理时,它正在聚合元数据存储中的所有读取文件。我没能在我的上下文中使用它 -
我通过过滤器和事务同步在表达式中编写了自己的条件和操作。
<!-- Input --> <bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/> <int-file:inbound-channel-adapter id="inputAdapter" channel="inputChannel" directory="file:${input.files.path}" comparator="lastModifiedFileComparator" scan-each-poll="true"> <int:poller max-messages-per-poll="1" fixed-rate="5000"> <int:transactional transaction-manager="transactionManager" isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/> </int:poller> </int-file:inbound-channel-adapter> <!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore --> <int:filter input-channel="inputChannel" output-channel="processChannel" discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/> <!-- Rollback by removing the file from the metadatastore --> <int:transaction-synchronization-factory id="syncFactory"> <int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" /> </int:transaction-synchronization-factory> <!-- Metadatastore configuration --> <bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource"> <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/> <property name="driverClassName" value="org.h2.Driver"/> <property name="username" value="${database.username}"/> <property name="password" value="${database.password}"/> <property name="maxIdle" value="4"/> <property name="defaultAutoCommit" value="false"/> </bean> <bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore"> <constructor-arg ref="jdbcDataSource"/> </bean> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="jdbcDataSource"/> </bean> <!-- Workflow --> <int:chain input-channel="processChannel" output-channel="outputChannel"> <int:service-activator ref="fileActivator" method="fileRead"/> <int:service-activator ref="fileActivator" method="fileProcess"/> <int:service-activator ref="fileActivator" method="fileAudit"/> </int:chain> <!-- Output --> <int-file:outbound-channel-adapter id="outputChannel" directory="file:${output.files.path}" filename-generator-expression ="payload.name"> <!-- Delete the source file --> <int-file:request-handler-advice-chain> <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice"> <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/> </bean> </int-file:request-handler-advice-chain> </int-file:outbound-channel-adapter>
欢迎任何改进或其他解决方案。