弹簧集成 - SFTP 入站适配器和事务同步



我正在使用带有自定义过滤器的 SFTP 入站适配器。我喜欢实现的是,如果过滤器接受文件,则让它通过消息流移动,该文件将使用"批处理集成:作业启动网关"传递到春季批处理作业,这工作正常。现在的问题是,如果过滤器拒绝它,那么我希望它移动到一些目录,如"/failed/",如果在批处理期间如果 jdbc 事务失败,那么我也希望它移动到"/failed/"目录,以便我们稍后处理它们。我们如何在春季集成中将 jdbc 事务与此场景同步。

我在春季集成中阅读了此文档(http://docs.spring.io/spring-integration/reference/html/transactions.html),但不清楚,示例应用程序将是完美的,否则一些提示也可以。

配置

<context:component-scan base-package="com.sftp.test" />
<import resource="beans-context.xml" />
<!-- Default poller -->
<int:poller default="true" fixed-delay="5000"/>

<!-- SFTP inbound poller -->
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
    channel="fileMessageChannel" session-factory="sftpSessionFactory"
    local-directory="${test.sftp.local.dir}" remote-directory="${test.sftp.remote.dir}"
    remote-file-separator="/" auto-create-local-directory="true"
    delete-remote-files="true" filename-pattern="*" local-filter="testFileFilter"
    preserve-timestamp="true" temporary-file-suffix=".writing"
    >
    <int:poller cron="${test.file.poll.frequency}"
        max-messages-per-poll="1">
        <int:transactional transaction-manager="transactionManager"
            synchronization-factory="syncFactory" /> 
    </int:poller>
</int-sftp:inbound-channel-adapter>
<int:transaction-synchronization-factory
    id="syncFactory">
    <int:after-commit expression="payload.renameTo('/tmp/dds/test/success/' + payload.name)"
        channel="committedChannel" />
    <int:after-rollback expression="payload.renameTo('/tmp/dds/test/failed/' + payload.name)"
        channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
<int:channel id="committedChannel">
    <int:queue />
</int:channel>
<int:logging-channel-adapter channel="committedChannel" />

<int:channel id="rolledBackChannel">
    <int:queue />
</int:channel>
<int:logging-channel-adapter channel="rolledBackChannel" />

<!-- Channel where message with file name will be dropped by inbound sftp 
    channel adapter -->
<int:channel id="fileMessageChannel">
    <int:queue />
</int:channel>
<!-- Transform spring integration message to job launch request -->
<int:transformer input-channel="fileMessageChannel"
    output-channel="jobRequestChannel" id="jobLaunchMessageTransformer">
    <bean class="com.sftp.test.test.util.FileMessageToJobRequest">
        <property name="job" ref="testJob" />
        <property name="fileParameterName" value="fileName" />
    </bean>
</int:transformer>
<!-- Job request channel -->
<int:channel id="jobRequestChannel">
    <int:queue />
</int:channel>
<!-- The JobLaunchingGateway is used to launch Batch Jobs. Internally it 
    delegates to a JobLaunchingMessageHandler. -->
<batch-integration:job-launching-gateway
    request-channel="jobRequestChannel" reply-channel="jobLaunchReplyChannel" />
<!-- job response channel -->
<int:channel id="jobLaunchReplyChannel">
    <int:queue />
</int:channel>
<!-- Logging response received from job on jobLaunchReplyChannel -->
<int:outbound-channel-adapter channel="jobLaunchReplyChannel"   ref="fileProcessAdapter"    method="moveFile"/>

问题是你使用的是QueueChannel s。一旦您将请求交给另一个线程,轮询器事务就会"提交"。

您需要在轮询器线程上运行作业(从通道中删除<queue/>元素)。如果要同时运行多个作业,请将任务执行程序添加到轮询器。

此外,为了拒绝和重命名文件,您不能使用内部筛选器,因为筛选的文件不会生成消息。您可以在筛选器中执行重命名,也可以在适配器中使用AcceptAllFileListFilter并在流中使用<filter/>(配置为在拒绝时引发异常)。

最新更新