我想使用Flink,Scala-Language和AddSource-和ReadCSVfile-intrions读取CSV文件。我还没有找到任何简单的例子。我只发现:https://github.com/dataartisans/flink-training-exercises/blob/master/src/src/main/scala/scala/com/com/dataartisans/flinktraining/flinktraining/datastream_scala/cep/cep/cep/longrides.scala,出于我的目的。
在定义中:streamExecutionEnvironment.AddSource(sourceFunction(我应该仅将readcsvfile用作sourceFile?
阅读后,我想使用CEP(复杂的事件处理(。
readcsvfile((仅作为Flink DataSet(batch(API的一部分可用,并且不能与DataStream(Streaming(API一起使用。这是ReadCsvfile((的一个很好的例子,尽管它可能与您要做的事情无关。
readTextFile((和readfile((是streamExecutionEnvironment上的方法,并且不实现源函数接口 - 它们并非与AddSource((一起使用,而是代替它。这是使用readTextFile((使用DataStream API加载CSV的示例。
另一个选项是使用表API和csvtablesource。这是一个示例和一些讨论,讨论了它的作用和不做。如果您走此路线,则需要使用streamTableEnvironment.toAppendStream((将表流转换为数据流之前,然后使用CEP。
请记住,所有这些方法都将仅读取文件一次,然后从其内容中创建有界的流。如果您想要一个在无限的CSV流中读取的源,并等待附加新行,则需要另一种方法。您可以使用自定义源,或sockettextstream或kafka之类的东西。
如果您有一个带有3个字段的CSV文件 - 字符串,长,整数
然后在下面做
val input=benv.readCsvFile[(String,Long,Integer)]("hdfs:///path/to/your_csv_file")
ps: - 我正在使用flink shell,这就是为什么我有benv