在Apache Flink中从输入文件创建对象



我有一个由文件夹和文件组成的数据集。文件夹/文件结构本身对数据分析很重要。

数据集结构:

folder1
   +-----file11
            +-----column1
            +-----column2

每个文件包含描述一个对象的数据。文件格式一致。它基本上是一个有两列的csv文件。这两列应该在结果对象中表示为元组序列。

文件的大小非常小。最多只有20 kb。每个文件夹大约包含200个文件。

期望的输出对象应该是:

{
    a: "folder1",              // name of parent folder
    b: "file11",               // name of content file
    c: Seq[(String, String)]   // content of file1
}

如何在Scala中处理这个数据集的读取?

有两种方法可以解决这个问题:

a)如果来自文件夹的数据非常小(小于几兆字节),您可以在本地进行读取,并使用ExecutionEnvironment.fromCollection()方法将数据带入Flink作业。

b)创建一个自定义InputFormat。InputFormat允许解析自定义文件格式。在您的情况下,我将扩展TextInputFormat并覆盖readRecord()方法。该方法将文件中的每一行作为字符串提供给您。然后,您可以手动解析来自String的数据,并返回解析后的结果,其中包含Tuple3中的目录信息。您可以从filePath变量访问路径。对于使用FileInputFormat s递归读取文件,存在recursive.file.enumeration配置值。

在阅读了上面的文章之后,我能够创建自定义FileInputFormat类来读取excel.xlsx文件并在flink中流式传输。代码如下

/*
 * Custom format output I was expecting from the records converted into records
 */
case class ExcelSheetData(Module : Double, StartTime : String, EndTime : String,....,  FileName : String)
/*
 * Custom class to read Excel spreadsheet using flink FileInputFormat class 
 */
class ExcelInputFormat extends FileInputFormat[ExcelSheetData]{
    var running : Boolean = false
    var excelData : Seq[ExcelSheetData] = null
    unsplittable = true
    override def open(fileSplit : FileInputSplit) = {
        println(fileSplit.getPath.toString.drop(6))
        val myFile = new File(fileSplit.getPath.toString.drop(6)) 
        val fileName = fileSplit.getPath.toString.drop(6)
        val fis = new FileInputStream(myFile)
        try{
            val myWorkbook = new XSSFWorkbook(fis)
            // println(s"Sheet Name: ${mySheet.getSheetName()}")
            // reading multiple sheets having identical data
            val mySheets = myWorkbook.iterator().asScala   
            val exData = for(s <- mySheets
                            if(s.getSheetName() == "Sheet1" || s.getSheetName() == "Sheet")) yield  {
                            val rowItr = s.rowIterator().asScala
                                for(e <- rowItr
                                    if(e.getRowNum() > 5 && e.getCell(1).getCellType() == 0)) yield {
                                    (e.getCell(1).getDateCellValue(), e.getCell(2).getDateCellValue(), ......,
                                    ,fileName)
                            }
                }
            excelData = exData.toSeq.flatten.map( record => {ExcelSheetData(record._1, record._2.toString, record._3.toString, record._4, record._5, record._6, record._7, record._8, record._9)})
            running = if(excelData.length >= 1) true else false
        } finally { fis.close()}
    }
    override def nextRecord(reuse: ExcelSheetData): ExcelSheetData = { 
        val head = excelData.head 
        excelData = excelData.tail 
        running = if (excelData.length == 0) false else true 
        head 
    }
    override def reachedEnd(): Boolean = ! running  
}
/*
 * Initialize custom class to read Excel Input
 */
val excelInput = new ExcelInputFormat()
// Read excel data into flink stream
val excelData = senv.readFile(excelInput, ExcelFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 1000)
                            .uid("Excel File Read")
//Windowing code down below... 

HadoopOffice库原生支持Flink Table API以及Flink DataSource/DataSink for Excel文件

https://github.com/ZuInnoTe/hadoopoffice/wiki

最新更新