同步 S3 存储桶并侦听更改



我有一个 AWS S3 存储桶,我每周都会在其中放置一个新的 ZIP 文件。

我想向使用 Spring Boot 编写的现有 Web 服务添加一项功能:在本地同步存储桶并观察更改。

目前,同步运行良好:每当将新文件添加到存储桶时,它都会在本地下载。但是,我不知道侦听文件更新,这是一种在本地下载新文件时触发的方法。能做到吗?

这是我的代码段:

#  --------
# | AWS S3 |
#  --------
s3.credentials-access-key=***
s3.credentials-secret-key=****
s3.bucket = my-bucket
s3.remote-dir = zips
s3.local-dir = D:/s3-bucket/
@Log4j2
@Configuration
public class S3Config {
public static final String OUT_CHANNEL_NAME = "s3filesChannel";
@Value("${s3.credentials-access-key}") private String accessKey;
@Value("${s3.credentials-secret-key}") private String secretKey;
@Value("${s3.remote-dir}") private String remoteDir;
@Value("${s3.bucket}") private String s3bucket;
@Value("${s3.local-dir}") private String localDir;
/*
* AWS S3
*/
@Bean
public AmazonS3 getAmazonS3(
){
BasicAWSCredentials creds = new BasicAWSCredentials(accessKey, secretKey);
AmazonS3 s3client = AmazonS3ClientBuilder
.standard()
.withRegion(Regions.EU_WEST_1)
.withCredentials(new AWSStaticCredentialsProvider(creds))
.build();
return s3client;        
}
@Bean
public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
return new S3SessionFactory(pAmazonS3);
}
@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
S3InboundFileSynchronizer sync = new S3InboundFileSynchronizer(pS3SessionFactory);
sync.setPreserveTimestamp(true);
sync.setDeleteRemoteFiles(false);
String fullRemotePath = s3bucket.concat("/").concat(remoteDir);
sync.setRemoteDirectory(fullRemotePath);
sync.setFilter(new S3RegexPatternFileListFilter(".*\.zip$"));
return sync;
}
@Bean
@InboundChannelAdapter(value = OUT_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
S3InboundFileSynchronizer pS3InboundFileSynchronizer
) {
S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalDirectory(new File(localDir));
messageSource.setLocalFilter(new AcceptOnceFileListFilter<File>());
return messageSource;
}
@Bean("s3filesChannel")
public PollableChannel s3FilesChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow fileReadingFlow(
S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource,
GtfsBizkaibus pGtfsBizkaibus,
@Qualifier("fileProcessor") MessageHandler pMessageHandler) {
return IntegrationFlows
.from(pS3InboundFileSynchronizingMessageSource, e -> e.poller(p -> p.fixedDelay(5, TimeUnit.SECONDS)))
.handle(pMessageHandler)
.get();
}
@Bean("fileProcessor")
public MessageHandler fileProcessor() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(localDir));
handler.setExpectReply(false); // end of pipeline, reply not needed
handler.setFileExistsMode(FileExistsMode.APPEND);
handler.setNewFileCallback((file, msg) -> {
log.debug("New file created... " + file.getAbsolutePath());
});
return handler;
}

您可以使用S3 事件通知和 SQS 队列。 基本上,将对象添加到存储桶时,S3 可以将事件发布到已注册的 SQS 队列。 然后,您可以让本地应用程序在队列中长时间轮询新事件,并处理添加的任何事件。

有关更多信息,请参阅此处。

实际上,S3InboundFileSynchronizingMessageSource为您完成了所有必要的工作:将新文件添加到远程存储桶时,该文件将下载到本地目录,并在要发送到配置通道的消息中作为payload生成。

修改远程文件时,也会将其下载到本地目录。

从版本5.0开始,AbstractInboundFileSynchronizingMessageSource提供了此选项:

/**
* Switch the local {@link FileReadingMessageSource} to use its internal
* {@code FileReadingMessageSource.WatchServiceDirectoryScanner}.
* @param useWatchService the {@code boolean} flag to switch to
* {@code FileReadingMessageSource.WatchServiceDirectoryScanner} on {@code true}.
* @since 5.0
*/
public void setUseWatchService(boolean useWatchService) {
this.fileSource.setUseWatchService(useWatchService);
if (useWatchService) {
this.fileSource.setWatchEvents(
FileReadingMessageSource.WatchEventType.CREATE,
FileReadingMessageSource.WatchEventType.MODIFY,
FileReadingMessageSource.WatchEventType.DELETE);
}
}

如果这对你有意义。

但是...使用 S3 到 SQS 通知,这也将是一个很好的解决方案。Spring Integration AWS 项目中有一个SqsMessageDrivenChannelAdapter

最后,正如@Artem Bilian所建议的那样,我使用了@ServiceActivator注释。这是完整的示例:

import java.io.File;
import java.util.concurrent.TimeUnit;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.aws.inbound.S3InboundFileSynchronizer;
import org.springframework.integration.aws.inbound.S3InboundFileSynchronizingMessageSource;
import org.springframework.integration.aws.support.S3SessionFactory;
import org.springframework.integration.aws.support.filters.S3RegexPatternFileListFilter;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import lombok.extern.log4j.Log4j2;
@Log4j2
@Configuration
public class S3Config {
public static final String IN_CHANNEL_NAME = "s3filesChannel";
@Value("${s3.credentials-access-key}") private String accessKey;
@Value("${s3.credentials-secret-key}") private String secretKey;
@Value("${s3.remote-dir}") private String remoteDir;
@Value("${s3.bucket}") private String s3bucket;
@Value("${s3.local-dir}") private String localDir;
/*
* AWS S3
*/
@Bean
public AmazonS3 getAmazonS3(
){
BasicAWSCredentials creds = new BasicAWSCredentials(accessKey, secretKey);
AmazonS3 s3client = AmazonS3ClientBuilder
.standard()
.withRegion(Regions.EU_WEST_1)
.withCredentials(new AWSStaticCredentialsProvider(creds))
.build();
return s3client;        
}
@Bean
public S3SessionFactory s3SessionFactory(AmazonS3 pAmazonS3) {
return new S3SessionFactory(pAmazonS3);
}
@Bean
public S3InboundFileSynchronizer s3InboundFileSynchronizer(S3SessionFactory pS3SessionFactory) {
S3InboundFileSynchronizer sync = new S3InboundFileSynchronizer(pS3SessionFactory);
sync.setPreserveTimestamp(true);
sync.setDeleteRemoteFiles(false);
String fullRemotePath = s3bucket.concat("/").concat(remoteDir);
sync.setRemoteDirectory(fullRemotePath);
sync.setFilter(new S3RegexPatternFileListFilter(".*\.zip$"));
return sync;
}
@Bean
@InboundChannelAdapter(value = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
public S3InboundFileSynchronizingMessageSource s3InboundFileSynchronizingMessageSource(
S3InboundFileSynchronizer pS3InboundFileSynchronizer
) {
S3InboundFileSynchronizingMessageSource messageSource = new S3InboundFileSynchronizingMessageSource(pS3InboundFileSynchronizer);
messageSource.setAutoCreateLocalDirectory(true);
messageSource.setLocalDirectory(new File(localDir));
messageSource.setUseWatchService(true);
return messageSource;
}
@Bean("s3filesChannel")
public PollableChannel s3FilesChannel() {
return new QueueChannel();
}
@Bean
public IntegrationFlow fileReadingFlow(
S3InboundFileSynchronizingMessageSource pS3InboundFileSynchronizingMessageSource,
@Qualifier("fileProcessor") MessageHandler pMessageHandler) {
return IntegrationFlows
.from(pS3InboundFileSynchronizingMessageSource, e -> e.poller(p -> p.fixedDelay(5, TimeUnit.SECONDS)))
.handle(pMessageHandler)
.get();
}
@Bean("fileProcessor")
public MessageHandler fileProcessor() {
FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(localDir));
handler.setExpectReply(false); // end of pipeline, reply not needed
handler.setFileExistsMode(FileExistsMode.REPLACE);
return handler;
}
@ServiceActivator(inputChannel = IN_CHANNEL_NAME, poller = @Poller(fixedDelay = "30"))
public void asada(Message<?> message) {
// TODO
log.debug("<< New message!");
}
}

请注意,我已将OUT_CHANNEL_NAME替换为IN_CHANNEL_NAME

PS:我几乎是Spring Integration的新手,所以我仍在学习它的概念。

相关内容

最新更新