我正在使用带有自定义过滤器的 SFTP 入站适配器。我喜欢实现的是,如果过滤器接受文件,则让它通过消息流移动,该文件将使用"批处理集成:作业启动网关"传递到春季批处理作业,这工作正常。现在的问题是,如果过滤器拒绝它,那么我希望它移动到一些目录,如"/failed/",如果在批处理期间如果 jdbc 事务失败,那么我也希望它移动到"/failed/"目录,以便我们稍后处理它们。我们如何在春季集成中将 jdbc 事务与此场景同步。
我在春季集成中阅读了此文档(http://docs.spring.io/spring-integration/reference/html/transactions.html),但不清楚,示例应用程序将是完美的,否则一些提示也可以。
配置
<context:component-scan base-package="com.sftp.test" />
<import resource="beans-context.xml" />
<!-- Default poller -->
<int:poller default="true" fixed-delay="5000"/>
<!-- SFTP inbound poller -->
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="fileMessageChannel" session-factory="sftpSessionFactory"
local-directory="${test.sftp.local.dir}" remote-directory="${test.sftp.remote.dir}"
remote-file-separator="/" auto-create-local-directory="true"
delete-remote-files="true" filename-pattern="*" local-filter="testFileFilter"
preserve-timestamp="true" temporary-file-suffix=".writing"
>
<int:poller cron="${test.file.poll.frequency}"
max-messages-per-poll="1">
<int:transactional transaction-manager="transactionManager"
synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>
<int:transaction-synchronization-factory
id="syncFactory">
<int:after-commit expression="payload.renameTo('/tmp/dds/test/success/' + payload.name)"
channel="committedChannel" />
<int:after-rollback expression="payload.renameTo('/tmp/dds/test/failed/' + payload.name)"
channel="rolledBackChannel" />
</int:transaction-synchronization-factory>
<int:channel id="committedChannel">
<int:queue />
</int:channel>
<int:logging-channel-adapter channel="committedChannel" />
<int:channel id="rolledBackChannel">
<int:queue />
</int:channel>
<int:logging-channel-adapter channel="rolledBackChannel" />
<!-- Channel where message with file name will be dropped by inbound sftp
channel adapter -->
<int:channel id="fileMessageChannel">
<int:queue />
</int:channel>
<!-- Transform spring integration message to job launch request -->
<int:transformer input-channel="fileMessageChannel"
output-channel="jobRequestChannel" id="jobLaunchMessageTransformer">
<bean class="com.sftp.test.test.util.FileMessageToJobRequest">
<property name="job" ref="testJob" />
<property name="fileParameterName" value="fileName" />
</bean>
</int:transformer>
<!-- Job request channel -->
<int:channel id="jobRequestChannel">
<int:queue />
</int:channel>
<!-- The JobLaunchingGateway is used to launch Batch Jobs. Internally it
delegates to a JobLaunchingMessageHandler. -->
<batch-integration:job-launching-gateway
request-channel="jobRequestChannel" reply-channel="jobLaunchReplyChannel" />
<!-- job response channel -->
<int:channel id="jobLaunchReplyChannel">
<int:queue />
</int:channel>
<!-- Logging response received from job on jobLaunchReplyChannel -->
<int:outbound-channel-adapter channel="jobLaunchReplyChannel" ref="fileProcessAdapter" method="moveFile"/>
问题是你使用的是QueueChannel
s。一旦您将请求交给另一个线程,轮询器事务就会"提交"。
您需要在轮询器线程上运行作业(从通道中删除<queue/>
元素)。如果要同时运行多个作业,请将任务执行程序添加到轮询器。
此外,为了拒绝和重命名文件,您不能使用内部筛选器,因为筛选的文件不会生成消息。您可以在筛选器中执行重命名,也可以在适配器中使用AcceptAllFileListFilter
并在流中使用<filter/>
(配置为在拒绝时引发异常)。