如何在 Flink 中连续读取 CSV 文件并删除标头



我正在使用 Flink 流 API,我想从文件夹中连续读取 CSV 文件,忽略标题并将 CSV 文件中的每一行转换为 Java 类 (POJO(。在所有这些处理之后,我应该获得一个Java对象流(POJO(。

到目前为止,我执行以下操作来部分实现行为(下面的代码(:

  • 连续将CSV文件作为常规文本文件读取
  • 从 CSV 文件中获取字符串流
  • 将字符串流转换为 Java 对象流

    String path = "/home/cosmin/Projects/flink_projects/flink-java-project/data/";
    TextInputFormat format = new TextInputFormat(
    new org.apache.flink.core.fs.Path(path));
    DataStream<String> inputStream = streamEnv.readFile(format, path, FileProcessingMode.PROCESS_CONTINUOUSLY, 100);
    DataStream<MyEvent> parsedStream = inputStream
    .map((line) -> {
    String[] cells = line.split(",");
    MyEvent event = new MyEvent(cells[1], cells[2], cells[3]);
    return event;
    });
    

但是,有了这个,我无法删除每个 CSV 文件中的标题行。

我已经读到,我可以通过使用 StreamExecutionEnvironment 类上的createInput()addSource ()方法来构建用于读取 CSV 文件的自定义连接器。

你能帮忙提供一些关于如何实现这一点的指导吗,因为我还没有找到Javadoc以外的任何例子?

您可以在映射函数之前链接一个过滤器函数以过滤掉标题行

inputStream.filter(new FilterFunction<String>() {
public boolean filter(String line) { 
if (line.contains("some header identifier")) return false;
else return true;
}
}).map(...)     <Your map function as before>

相关内容

  • 没有找到相关文章