s3 入站通道适配器中的轮询器/适配器无法正常工作



我的方案是,每当文件被修改时,仅将一个文件从 AWS S3 存储桶传输到 EC2 实例一次。我使用以下配置并在服务器启动时手动启动适配器。

问题是服务器启动时执行重复 5 或 6 次。看起来正在发生不同的线程执行。我能够在日志中看到不同的任务执行器,不知道这是轮询器问题还是适配器问题。

我正在使用服务激活器根据 S3 位置中的文件更改执行其他一些操作。

注意:此问题仅在启动时发生一次。它工作正常,进一步修改文件。

配置:

<bean id="s3SessionFactory" 
class="org.springframework.integration.aws.support.S3SessionFactory"></bean>
<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />
<task:executor id="s3PollingExecutor" pool-size="1" queue-capacity="10" />
<integration:channel id="s3FilesChannel"/>
<int-aws:s3-inbound-channel-adapter id="s3FileInbound"
channel="s3FilesChannel" 
session-factory="s3SessionFactory" 
auto-create-local-directory="false"
delete-remote-files="false" 
preserve-timestamp="true"
filter="acceptOnceFilter"
local-directory="local_directory"
auto-startup="false" 
remote-directory="s3_bucket">
<integration:poller id="s3FilesChannelPoller" 
fixed-rate="1000" 
max-messages-per-poll="1" time-unit="MILLISECONDS" 
task-executor="s3PollingExecutor">
</integration:poller>
</int-aws:s3-inbound-channel-adapter>
<integration:service-activator id="s3FilesChannelWatcher" 
input-channel="s3FilesChannel" 
output-channel="nullChannel"
ref="configurationFileWatcher" 
method="getConfigurationFileWatcher">
</integration:service-activator>                                      

正如您所建议的,我已经尝试了以下方法.

<bean id="acceptOnceFilterRegion"
class="cS3FileFilterOnLastModifiedTime">
<constructor-arg index="0" ref="metaDataStoreRegion"/>
<constructor-arg index="1" value="*"/>
</bean>                                                                                                

添加了用于检查上次修改时间的逻辑

import org.springframework.integration.aws.support.filters.S3PersistentAcceptOnceFileListFilter;
import org.springframework.integration.metadata.ConcurrentMetadataStore;
import com.amazonaws.services.s3.model.S3ObjectSummary;
public class S3FileFilterOnLastModifiedTime extends S3PersistentAcceptOnceFileListFilter {
Long delayTime = 1000L;
public S3FileFilterOnLastModifiedTime(ConcurrentMetadataStore store, String prefix) {
super(store, prefix);
}
@Override
public boolean accept(S3ObjectSummary file) {
long lastModified = modified(file);
long currentTime = System.currentTimeMillis();
long timeDifference = currentTime - lastModified;
return timeDifference > delayTime;
}   
} 

仍然没有希望.logs是这样的.......

[INFO ] 2018-10-11 11:22:10,888 [s3PollingExecutor-1] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,892 [s3PollingExecutor-2] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,892 [s3PollingExecutor-3] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,893 [s3PollingExecutor-4] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,893 [s3PollingExecutor-5] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully
[INFO ] 2018-10-11 11:22:10,894 [s3PollingExecutor-6] ConfigurationSettingWatcher {} - ConfigurationSettingWatcher Started succesfully

我认为您错过了这样一个事实,即您的s3_bucket在启动时不为空,并且<int-aws:s3-inbound-channel-adapter>在启动时拾取所有文件。这就是您如何看待这几个任务:每个任务针对每个文件。

如果您真的只担心该存储桶中的新文件,则需要考虑不要使用内存中AcceptOnceFileListFilter,而是基于共享MetadataStore实现切换到一些持久实现。为此,spring-integration-aws中有一个S3PersistentAcceptOnceFileListFilterDynamoDbMetadataStore将筛选结果保存到 AWS 上的DynamoDb中:https://github.com/spring-projects/spring-integration-aws#metadata-store-for-amazon-dynamodb

最新更新