是否有将小文件写入S3作为实时流媒体



我在一分钟内得到了20k个1kb到3kb大小的小xml文件。我必须在它到达目录时写入所有文件。

有时,传入文件的速度会增加到每分钟100k。java或aws-api中有什么东西可以帮助我匹配传入速度吗?

我正在使用uploadFileList()API上传所有文件。我也尝试过观察事件,这样当文件到达文件夹时,它会将该文件上传到S3,但与传入的文件相比,这太慢了,会产生大量的积压。

我也尝试过多线程,但如果我旋转更多的线程,我会从S3reduce you request rate error.得到错误有时我会得到低于的错误

AmazonServiceException:com.amazonaws.services.s3.model.AmazonS3异常:您的套接字未在中读取或写入到服务器的连接超时时间。空闲连接将关闭。

但当我不使用线程时,我不会得到这个错误

我也尝试过的另一种方法是创建一个大文件,然后上传到S3,然后在S3中,我再次将其拆分为小文件,这很好,但这种解决方案会延迟文件上传到S3中,并影响从S3访问该文件的用户。

我知道上传小文件到S3是不合适的,但我有这样的用例。

我注意到的速度是在一分钟内上传5公里的文件。

有人能建议一些替代方法吗?这样我上传文件的速度每分钟至少会增加15k。

我正在分享我的完整代码,我正在尝试使用多线程应用程序上传

第一类,我在其中创建要放入线程的文件

public class FileProcessThreads {
public  ArrayList process(String fileLocation)  {
File dir = new File(fileLocation);
File[] directoryListing = dir.listFiles();
ArrayList<File> files = new ArrayList<File>();
if (directoryListing.length > 0) {
for (File path : directoryListing) {
files.add(path);
}
}
return files;
}
}

第2类,我在其中创建线程池和执行程序

public class UploadExecutor {
private static String fileLocation = "C:\Users\u6034690\Desktop\ONEFILE";
// private static String fileLocation="D:\TRFAudits_Moved\";
private static final String _logFileName = "s3FileUploader.log";
private static Logger _logger = Logger.getLogger(UploadExecutor.class);
@SuppressWarnings("unchecked")
public static void main(String[] args) {
_logger.info("----------Stating application's  main method----------------- ");
AWSCredentials credential = new ProfileCredentialsProvider("TRFAuditability-Prod-ServiceUser").getCredentials();
final ClientConfiguration config = new ClientConfiguration();
AmazonS3Client s3Client = (AmazonS3Client) AmazonS3ClientBuilder.standard().withRegion("us-east-1")
.withCredentials(new AWSStaticCredentialsProvider(credential)).withForceGlobalBucketAccessEnabled(true)
.build();
s3Client.getClientConfiguration().setMaxConnections(100);
TransferManager tm = new TransferManager(s3Client);
while (true) {
FileProcessThreads fp = new FileProcessThreads();
List<File> records = fp.process(fileLocation);
while (records.size() <= 0) {
try {
_logger.info("No records found willl wait for 10 Seconds");
TimeUnit.SECONDS.sleep(10);
records = fp.process(fileLocation);
} catch (InterruptedException e) {
_logger.error("InterruptedException: " + e.toString());
}
}
_logger.info("Total no of Audit files = " + records.size());
ExecutorService es = Executors.newFixedThreadPool(2);
int recordsInEachThread = (int) (records.size() / 2);
_logger.info("No of records in each thread = " + recordsInEachThread);
UploadObject my1 = new UploadObject(records.subList(0, recordsInEachThread), tm);
UploadObject my2 = new UploadObject(records.subList(recordsInEachThread, records.size()), tm);
es.execute(my1);
es.execute(my2);
es.shutdown();
try {
boolean finshed = es.awaitTermination(1, TimeUnit.MINUTES);
if (!finshed) {
Thread.sleep(1000);
}
} catch (InterruptedException e) {
_logger.error("InterruptedException: " + e.toString());
}
}
}

}

我上传文件到S3 的最后一个类

public class UploadObject implements Runnable{
static String bucketName = "a205381-auditxml/S3UPLOADER";
private String fileLocation="C:\Users\u6034690\Desktop\ONEFILE";
//private String fileLocation="D:\TRFAudits\";
//static String bucketName = "a205381-auditxml/S3UPLOADER"; 
private static Logger _logger;
List<File> records;
TransferManager tm;
UploadObject(List<File> list,TransferManager tm){
this.records = list;
this.tm=tm;
_logger = Logger.getLogger(UploadObject.class);
}
public void run(){
System.out.println(Thread.currentThread().getName() + " : ");
uploadToToS3();
}
public  void uploadToToS3() {
_logger.info("Number of record to be processed in current thread: : "+records.size());
MultipleFileUpload xfer = tm.uploadFileList(bucketName, "TEST",new File(fileLocation), records);
try {
xfer.waitForCompletion();
TransferState xfer_state = xfer.getState();
_logger.info("Upload status -----------------" + xfer_state);
for (File file : records) {
try {
Files.delete(FileSystems.getDefault().getPath(file.getAbsolutePath()));
} catch (IOException e) {
System.exit(1);
_logger.error("IOException: "+e.toString());
}
}
_logger.info("Successfully completed file cleanse");
} catch (AmazonServiceException e) {
_logger.error("AmazonServiceException: "+e.toString());
System.exit(1);
} catch (AmazonClientException e) {
_logger.error("AmazonClientException: "+e.toString());
System.exit(1);
} catch (InterruptedException e) {
_logger.error("InterruptedException: "+e.toString());
System.exit(1);
}
System.out.println("Completed");
_logger.info("Upload completed");
_logger.info("Calling Transfer manager shutdown");
//tm.shutdownNow();
}

}

这听起来像是在跳闸S3的内置保护(下面引用的文档)。我还在下面列出了一些类似的问题;其中一些建议重新使用SQS来均衡和分配S3上的负载。

除了引入更多移动部件外,您还可以重复使用S3Client和TransferManager。将它们从可运行对象中上移,并将它们传递到其构造函数中。TransferManager本身根据javadoc使用多线程。

在可能的情况下,TransferManager尝试使用多个线程同时上传单个上传的多个部分。当处理大的内容大小和高带宽时,这可以显著提高吞吐量。

您还可以增加S3Client使用的最大同时连接数。

也许:

CCD_ 5或甚至更高。

CCD_ 6被设置为50。

最后,您可以尝试上传到bucket下的不同前缀/文件夹,如下所述,以实现高请求率的扩展。

当前AWS请求率和性能指南

AmazonS3自动扩展到高请求率。例如,您的应用程序在一个bucket中的每个前缀每秒至少可以实现3500个PUT/POST/DELETE和5500个GET请求。存储桶中的前缀数量没有限制。以指数形式提高读写性能很简单。例如,如果在AmazonS3存储桶中创建10个前缀来并行读取,则可以将读取性能扩展到每秒55000个读取请求。

当前的AWS S3错误最佳实践

针对重复减速错误调整应用程序

与任何分布式系统一样,S3具有检测有意或无意的资源过度消耗并做出相应反应的保护机制。当高请求速率触发其中一种机制时,可能会发生SlowDown错误。降低请求率将减少或消除此类错误。一般来说,大多数用户不会经常遇到这些错误;但是,如果您想要更多信息,或者遇到了严重或意外的SlowDown错误,请发布到我们的AmazonS3开发者论坛https://forums.aws.amazon.com/或注册AWS高级支持https://aws.amazon.com/premiumsupport/.

类似问题:

S3慢下来:请降低您的请求率异常

亚马逊网络服务S3请求限制

AWS论坛-最大限度地重用S3 getObjectMetadata()调用的连接

S3传输加速不一定能提供更快的上传速度。当从同一地区使用时,它有时比正常上传慢。亚马逊S3传输加速使用他们在世界各地拥有的AWS边缘基础设施,更快地将数据传输到AWS主干网。当您使用Amazon S3 Transfer Acceleration时,您的请求会根据延迟路由到最佳的AWS边缘位置。然后,Transfer Acceleration将使用优化的网络协议、从边缘到原点的持久连接、完全打开的发送和接收窗口等,通过AWS管理的骨干网络将您的上传内容发送回S3。因为你已经在该地区了,所以使用它不会有任何好处。但是,最好从https://s3-accelerate-speedtest.s3-accelerate.amazonaws.com/en/accelerate-speed-comparsion.html

最新更新