Spring Integration:使用多实例重试配置



我在4个不同的服务器上运行4个基于Spring Boot Integration的应用程序实例。过程是:

  1. 在共享文件夹中逐个读取XML文件
  2. 处理文件(检查结构、内容…(、转换数据并发送电子邮件
  3. 在另一个共享文件夹中编写有关此文件的报告
  4. 删除已成功处理的文件

我正在寻找一个非阻塞和安全的解决方案来处理这些文件。

用例:

  • 如果一个实例在读取或处理文件时崩溃(因此不会结束集成链(:另一个实例必须处理该文件,或者同一个实例重新启动后必须处理该
  • 如果一个实例正在处理文件,则其他实例不得处理该文件

我已经构建了这个Spring Integration XML配置文件(它包括带有共享H2数据库的JDBC元数据存储(:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd">
<int:poller default="true" fixed-rate="1000"/>
<int:channel id="inputFilesChannel">
<int:queue/>
</int:channel>
<!-- Input -->
<int-file:inbound-channel-adapter 
id="inputFilesAdapter"
channel="inputFilesChannel"
directory="file:${input.files.path}" 
ignore-hidden="true"
comparator="lastModifiedFileComparator"
filter="compositeFilter">
<int:poller fixed-rate="10000" max-messages-per-poll="1"  task-executor="taskExecutor"/>
</int-file:inbound-channel-adapter>
<task:executor id="taskExecutor" pool-size="1"/>
<!-- Metadatastore -->
<bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
<property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
<property name="driverClassName" value="org.h2.Driver"/>
<property name="username" value="${database.username}"/>
<property name="password" value="${database.password}"/>
<property name="maxIdle" value="4"/>
</bean>
<bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
<constructor-arg ref="jdbcDataSource"/>
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="jdbcDataSource"/>
</bean>
<bean id="compositeFilter" class="org.springframework.integration.file.filters.CompositeFileListFilter">
<constructor-arg>
<list>
<bean class="org.springframework.integration.file.filters.FileSystemPersistentAcceptOnceFileListFilter">
<constructor-arg index="0" ref="jdbcMetadataStore"/>
<constructor-arg index="1" value="files"/>
</bean>
</list>
</constructor-arg>
</bean>
<!-- Workflow -->
<int:chain input-channel="inputFilesChannel" output-channel="outputFilesChannel">
<int:service-activator ref="fileActivator" method="fileRead"/>
<int:service-activator ref="fileActivator" method="fileProcess"/>
<int:service-activator ref="fileActivator" method="fileAudit"/>
</int:chain>
<bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
<int-file:outbound-channel-adapter 
id="outputFilesChannel" 
directory="file:${output.files.path}"
filename-generator-expression ="payload.name">
<int-file:request-handler-advice-chain>
<bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
<property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
</bean>
</int-file:request-handler-advice-chain>
</int-file:outbound-channel-adapter>
</beans>

问题:对于多个文件,当成功处理一个文件时,事务会提交元数据存储中的其他现有文件(表INT_METADATA_STORE(。因此,如果重新启动应用程序,其他文件将永远不会被处理(如果应用程序在处理第一个文件时崩溃,它会正常工作(。它似乎只适用于读取文件,而不适用于处理集成链中的文件。。。如何逐文件管理JVM崩溃时的回滚事务?

非常感谢您的帮助。这会让我发疯的:(

谢谢!

编辑/注释:

  • 灵感来自https://github.com/caoimhindenais/spring-integration-files/blob/master/src/main/resources/context.xml

  • 我已经用Artem Bilan的回答更新了我的配置。并删除poller块中的transactional块:我在实例之间发生了事务冲突(丑陋的表锁异常(。尽管行为是一样的

  • 我在poller块中测试此配置失败(行为相同(:

    <int:advice-chain>
    <tx:advice id="txAdvice" transaction-manager="transactionManager">
    <tx:attributes>
    <tx:method name="file*" timeout="30000" propagation="REQUIRED"/>
    </tx:attributes>
    </tx:advice>
    </int:advice-chain>
    
  • 也许基于Idempontent Receiver Enterprise Integration Pattern的解决方案可以工作。但我没能配置它……我找不到确切的文档

您不应该使用PseudoTransactionManager,而应该使用DataSourceTransactionManager

由于使用JdbcMetadataStore,它将参与事务,如果下游流失败,元数据存储中的条目也将回滚。

好的。我找到了一个可行的解决方案。也许不是最干净的,但它有效:

  • 多个实例位于不同的服务器上,共享同一个H2数据库(网络文件夹装载(。我认为它应该通过远程TCP工作。MVCC已在H2上激活(请查看其文档(
  • inbound-channel-adapter激活了scan-each-poll选项,以允许重新填充以前可以忽略的文件(如果进程已经由另一个实例开始(。因此,如果另一个实例崩溃,则可以在不重新启动该实例的情况下再次轮询和处理该文件
  • 在DB上将选项defaultAutoCommit设置为false
  • 我没有使用FileSystemPersistentAcceptOnceFileListFilter,因为当一个文件被成功处理时,它正在聚合元数据存储中的所有读取文件。我没能在我的上下文中使用它
  • 我通过过滤器和事务同步在表达式中编写了自己的条件和操作。

    <!-- Input -->
    <bean id="lastModifiedFileComparator" class="org.apache.commons.io.comparator.LastModifiedFileComparator"/>
    <int-file:inbound-channel-adapter
    id="inputAdapter"
    channel="inputChannel"
    directory="file:${input.files.path}"
    comparator="lastModifiedFileComparator"
    scan-each-poll="true">
    <int:poller max-messages-per-poll="1" fixed-rate="5000">
    <int:transactional transaction-manager="transactionManager"  isolation="READ_COMMITTED" propagation="REQUIRED" timeout="60000" synchronization-factory="syncFactory"/>
    </int:poller>
    </int-file:inbound-channel-adapter>
    <!-- Continue only if the concurrentmetadatastore doesn't contain the file. If if is not the case : insert it in the metadatastore -->
    <int:filter input-channel="inputChannel" output-channel="processChannel"  discard-channel="nullChannel" throw-exception-on-rejection="false" expression="@jdbcMetadataStore.putIfAbsent(headers[file_name], headers[timestamp]) == null"/>
    <!-- Rollback by removing the file from the metadatastore -->
    <int:transaction-synchronization-factory id="syncFactory">
    <int:after-rollback expression="@jdbcMetadataStore.remove(headers[file_name])" />
    </int:transaction-synchronization-factory>
    <!-- Metadatastore configuration -->
    <bean id="jdbcDataSource" class="org.apache.commons.dbcp.BasicDataSource">
    <property name="url" value="jdbc:h2:file:${database.path}/shared;AUTO_SERVER=TRUE;AUTO_RECONNECT=TRUE;MVCC=TRUE"/>
    <property name="driverClassName" value="org.h2.Driver"/>
    <property name="username" value="${database.username}"/>
    <property name="password" value="${database.password}"/>
    <property name="maxIdle" value="4"/>
    <property name="defaultAutoCommit" value="false"/>
    </bean>
    <bean id="jdbcMetadataStore" class="org.springframework.integration.jdbc.metadata.JdbcMetadataStore">
    <constructor-arg ref="jdbcDataSource"/>
    </bean>
    <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
    <property name="dataSource" ref="jdbcDataSource"/>
    </bean>
    <!-- Workflow -->
    <int:chain input-channel="processChannel" output-channel="outputChannel">
    <int:service-activator ref="fileActivator" method="fileRead"/>
    <int:service-activator ref="fileActivator" method="fileProcess"/>
    <int:service-activator ref="fileActivator" method="fileAudit"/>
    </int:chain>
    
    <!-- Output -->
    <int-file:outbound-channel-adapter 
    id="outputChannel" 
    directory="file:${output.files.path}"
    filename-generator-expression ="payload.name">
    <!-- Delete the source file -->
    <int-file:request-handler-advice-chain>
    <bean class="org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice">
    <property name="onSuccessExpressionString" value="headers[file_originalFile].delete()"/>
    </bean>
    </int-file:request-handler-advice-chain>
    </int-file:outbound-channel-adapter>
    

欢迎任何改进或其他解决方案。

最新更新