我有一个由文件夹和文件组成的数据集。文件夹/文件结构本身对数据分析很重要。
数据集结构:
folder1
+-----file11
+-----column1
+-----column2
每个文件包含描述一个对象的数据。文件格式一致。它基本上是一个有两列的csv文件。这两列应该在结果对象中表示为元组序列。
文件的大小非常小。最多只有20 kb。每个文件夹大约包含200个文件。
期望的输出对象应该是:
{
a: "folder1", // name of parent folder
b: "file11", // name of content file
c: Seq[(String, String)] // content of file1
}
如何在Scala中处理这个数据集的读取?
有两种方法可以解决这个问题:
a)如果来自文件夹的数据非常小(小于几兆字节),您可以在本地进行读取,并使用ExecutionEnvironment.fromCollection()
方法将数据带入Flink作业。
b)创建一个自定义InputFormat。InputFormat允许解析自定义文件格式。在您的情况下,我将扩展TextInputFormat
并覆盖readRecord()
方法。该方法将文件中的每一行作为字符串提供给您。然后,您可以手动解析来自String的数据,并返回解析后的结果,其中包含Tuple3中的目录信息。您可以从filePath
变量访问路径。对于使用FileInputFormat
s递归读取文件,存在recursive.file.enumeration
配置值。
在阅读了上面的文章之后,我能够创建自定义FileInputFormat类来读取excel.xlsx文件并在flink中流式传输。代码如下
/*
* Custom format output I was expecting from the records converted into records
*/
case class ExcelSheetData(Module : Double, StartTime : String, EndTime : String,...., FileName : String)
/*
* Custom class to read Excel spreadsheet using flink FileInputFormat class
*/
class ExcelInputFormat extends FileInputFormat[ExcelSheetData]{
var running : Boolean = false
var excelData : Seq[ExcelSheetData] = null
unsplittable = true
override def open(fileSplit : FileInputSplit) = {
println(fileSplit.getPath.toString.drop(6))
val myFile = new File(fileSplit.getPath.toString.drop(6))
val fileName = fileSplit.getPath.toString.drop(6)
val fis = new FileInputStream(myFile)
try{
val myWorkbook = new XSSFWorkbook(fis)
// println(s"Sheet Name: ${mySheet.getSheetName()}")
// reading multiple sheets having identical data
val mySheets = myWorkbook.iterator().asScala
val exData = for(s <- mySheets
if(s.getSheetName() == "Sheet1" || s.getSheetName() == "Sheet")) yield {
val rowItr = s.rowIterator().asScala
for(e <- rowItr
if(e.getRowNum() > 5 && e.getCell(1).getCellType() == 0)) yield {
(e.getCell(1).getDateCellValue(), e.getCell(2).getDateCellValue(), ......,
,fileName)
}
}
excelData = exData.toSeq.flatten.map( record => {ExcelSheetData(record._1, record._2.toString, record._3.toString, record._4, record._5, record._6, record._7, record._8, record._9)})
running = if(excelData.length >= 1) true else false
} finally { fis.close()}
}
override def nextRecord(reuse: ExcelSheetData): ExcelSheetData = {
val head = excelData.head
excelData = excelData.tail
running = if (excelData.length == 0) false else true
head
}
override def reachedEnd(): Boolean = ! running
}
/*
* Initialize custom class to read Excel Input
*/
val excelInput = new ExcelInputFormat()
// Read excel data into flink stream
val excelData = senv.readFile(excelInput, ExcelFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
.uid("Excel File Read")
//Windowing code down below...
HadoopOffice库原生支持Flink Table API以及Flink DataSource/DataSink for Excel文件
https://github.com/ZuInnoTe/hadoopoffice/wiki