FileInputStream/FileOutputStream与FSDataInputStream/FSDataOut



我试图理解FileInputStream与FSDataInputStream以及FileOutputStream与FSDataOutputStream之间的区别。

我正试图从S3存储桶中读取一个文件,并应用一些格式更改,然后想将其写入spark java应用程序中的另一个S3存储桶

我对是否需要使用FileInputStream或FSDataInputStream来读取文件以及如何使用FileOutputStream或FSDataOutputStream将文件写入S3存储桶感到困惑。

有人能用一些例子来解释我们需要如何以及在哪里适当地使用它们吗?

FSDataInputStreamFSDataOutputStream是hadoop公共中的类

FSDataInputStream

FSDataInputStream添加了用于将特定偏移量的数据读取到字节阵列(PositionedReadable(或字节缓冲区中的高性能API。这些被广泛用于读取文件的库,其中读取不是顺序的,更随机的IO、parquet、orc等。FileSystem实现通常提供这些文件的高效实现。这些api与简单的文件复制无关,除非您真的在努力实现最大性能,已经向多个流打开了同一个源文件,并且正在跨流并行获取块。Distcp会这样做,这就是为什么如果你努力,它会使网络过载的原因。

完整规范,包括PositionedReadable:fsdatainputstream

FSDataOutputStream

CCD_ 5不会对正常的CCD_ 6增加那么多;最重要的接口是Syncable,其hflush和hsync调用具有关于持久性的特定保证,如";当它们返回时,数据已经被持久化到HDFS或其他文件系统,用于hsync,一直到磁盘";。如果您正在实现像HBase这样的数据库,那么您需要这些和那些保证。如果你不是,那你就真的不是。在最近的hadoop版本中,试图在写入S3时使用它们只会记录一条警告消息,告诉您停止它。毕竟,这不是一个真正的文件系统。

完整规范,包括syncable:outputstream

按比例复制spark中的文件

如果您想在spark中高效地复制文件,请打开具有几个MB缓冲区的源文件和dest文件,读取缓冲区,然后将其写回。您可以将这些工作分布在集群中以获得更好的并行性,如下例所示:https://github.com/hortonworks-spark/cloud-integration/blob/master/spark-cloud-integration/src/main/scala/com/cloudera/spark/cloud/applications/CloudCp.scala

  1. 如果你只想复制一两个文件,只需在一个进程中完成,可能是多线程的
  2. 如果你真的在寻找针对s3的性能,那么就列出要复制的文件列表,先安排最大的几个文件,这样它们就不会在最后占据你,然后将列表的其余部分随机化,以避免在s3存储桶中创建热点

您可以使用任何一种,这取决于您需要什么。

它们都只是流实现。最终,您要做的是从一个bucket中获取和输入流,并将其写入另一个bucket的输出流。

FileInputStreamFileOutputStream具体组件,提供从映射文件读取和写入流的功能。

FSDataInputStreamFSDataOutputStream是inputstream的具体装饰器。意味着为输入流提供或修饰功能,例如读取和写入原语以及提供缓冲流。

选择哪一个?你需要FSDataOutputStream和FSDataInputStream提供的装饰吗?FileInputStream和FileOutputStream是否足够?

就我个人而言,我希望使用ReadersWriters,如下所示:

如何使用Java读取AWS S3文件?

private final AmazonS3 amazonS3Client = AmazonS3ClientBuilder.standard().build();
private Collection<String> loadFileFromS3() {
try (final S3Object s3Object = amazonS3Client.getObject(BUCKET_NAME,
FILE_NAME);
final InputStreamReader streamReader = new InputStreamReader(s3Object.getObjectContent(), StandardCharsets.UTF_8);
final BufferedReader reader = new BufferedReader(streamReader)) {
return reader.lines().collect(Collectors.toSet());
} catch (final IOException e) {
log.error(e.getMessage(), e)
return Collections.emptySet();
}
}

最新更新