读取Flink中的CSV文件作为数据流



我是Apache Flink的新手,版本1.32,我正在尝试读取CSV文件到数据流

我可以读取为String,

import org.apache.flink.api.java.io.TextInputFormat
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.source.FileProcessingMode
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}

object AdvCsvRead {
def main(args: Array[String]): Unit = {
// set up the execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment
val path = "src/main/resources/sales_orders.csv"
val ds: DataStream[String] = env.readFile(new TextInputFormat(new Path(path)), path, FileProcessingMode.PROCESS_ONCE, 100)
ds.print()
env.execute("AdvCsvRead")
}
case class Sales (
var ID: Integer,
var Customer: String,
var Product: String,
var Date: String,
var Quantity: Integer,
var Rate: Double,
var Tags: String
)

}

需要一个关于如何使用Scala案例类作为数据流读取CSV的示例,因为文档有限,需要您的帮助!

举几个例子

得到一个配方- https://docs.immerok.cloud/docs/how-to-guides/development/continuously-reading-csv-files-with-apache-flink/

https://github.com/rxda/flink-scala3-demo/blob/main/src/main/scala/com/ververica/Example_00.scala

https://github.com/twalthr/flink-api-scala-3/blob/main/src/main/scala/Main.scala

这里是完整的工作示例= https://github.com/sandeep540/flink-scala3-csv

最新更新