从 Spark 中的多个.gz文件中读取特定文件



我正在尝试读取一个具有特定名称的文件,该文件存在于文件夹内的多个.gz文件中。
例如
D:/sample_datasets/gzfiles |-my_file_1.tar.gz |-my_file_1.tar |-file1.csv |-file2.csv |-file3.csv |-my_file_2.tar.gz |-my_file_2.tar |-file1.csv |-file2.csv |-file3.csv

我只对阅读在所有.gz文件中具有相同架构的file1.csv的内容感兴趣。

我正在将路径D:/sample_datasets/gzfiles传递给JavaSparkContext中的wholeTextFiles()方法。但是,它返回 tar 中所有文件的内容,即 file1.csv、file2.csv、file3.csv。

有没有办法我只能在数据集或RDD中读取file1.csv的内容。提前感谢!

在路径末尾使用*.gz

希望这有帮助!

我能够使用以下代码片段执行该过程,我从SO上的多个答案中使用


JavaPairRDD tarData = sparkContext.binaryFiles("D:/sample_datasets/gzfiles/*.tar.gz");
JavaRDD tarRecords = tarData.flatMap(new FlatMapFunction, Row>(){
private static final long serialVersionUID = 1L;
@Override
public Iterator call(Tuple2 t) throws Exception {
TsvParserSettings settings = new TsvParserSettings();
TsvParser parser = new TsvParser(settings);
List records = new ArrayList();
TarArchiveInputStream tarInput = new TarArchiveInputStream(new GzipCompressorInputStream(t._2.open()));
TarArchiveEntry entry;
while((entry = tarInput.getNextTarEntry()) != null) {
if(entry.getName().equals("file1.csv")) {
InputStreamReader streamReader = new InputStreamReader(tarInput);
BufferedReader reader = new BufferedReader(streamReader);
String line;
while((line = reader.readLine())!= null) {
String [] parsedLine = parser.parseLine(line);
Row row = RowFactory.create(parsedLine);
records.add(row);
}
reader.close();
break;
}
}
tarInput.close();
return records.iterator();
}
});

相关内容

  • 没有找到相关文章

最新更新