flink apache :什么是最有效的:读取文件并将文本拆分为元组或读取csv



我做了这个小代码,将文件夹中的文件放入数据流中:

public class TextFromDirStream {
//
//  Program
//
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
// monitor directory, checking for new files
// every 100 milliseconds
TextInputFormat format = new TextInputFormat(
new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile(
format,
"file:///tmp/dir/",
FileProcessingMode.PROCESS_CONTINUOUSLY,
100,
FilePathFilter.createDefaultFilter());
inputStream.print();
// execute program
env.execute("Java read file from folder Example");
}

}

我的下一步是处理文件内容(csv)。处理这个问题最有效的方法是什么?我是否更改我的代码以解析文本文件 inputStream 并将其转换为元组或 readFile 作为 CSV。我问这个问题是因为我很难找到有关如何将文本拆分为元组的示例或文档。

提前谢谢你

从代码开始,流中的每个事件 (inputStream) 都是一行字符串。您可以将一条线映射到元组:

DataStream<Tuple2<Long, String>> parsedStream = inputStream
.map((line) -> {
String[] cells = line.split(",");
// Only keep first and third cells
return new Tuple2(Long.parseLong(cells[2]), cells[0]); 
});

您还可以使用包含字段选择并且能够创建TupleX或POJO的readCsvFile(但是readCsvFile没有PROCESS_CONTINUOUSLY)。另请注意,如果您使用 PROCESS_CONTINUOUSLY,则每个修改后的文件将被完全(再次)处理,这与一个文件不匹配!

最新更新