在 Flink 中读取 xml 文件



我正在尝试使用 flink 同步一个进程,以从本地文件系统读取 xml 文件并将其同步到 s3。


我需要解析每个 xml 文件中的 taf 并使用它来将其发送到 s3 中的相应文件夹。

例如:我的文件包含文件夹1 ....三十

我需要从中读取值并将其发送到/folder1

我能够读取文件内容并将其同步到 s3,但内容是逐行出现的。

我按照 中的建议使用了 TextInputFormat NFS (Netapp server(-> Flink ->s3

我尝试了不同的格式,如分隔符输入格式等,但没有成功。 我通过谷歌搜索,但找不到任何解决方案。这不是支持的东西吗?

有没有办法读取整个文件或至少在标签之间读取值?

StreamExecutionEnvironment env =    
StreamExecutionEnvironment.getExecutionEnvironment();
// monitor directory, checking for new files
// every 100 milliseconds
TextInputFormat format = new TextInputFormat(
new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile(
format, 
"file:///tmp/dir/",
FileProcessingMode.PROCESS_CONTINUOUSLY, 
100, 
FilePathFilter.createDefaultFilter());

首先,我假设这是针对批处理(数据集(工作流的。我通常通过创建文件路径列表作为工作流的输入来处理此问题,使用自定义源来处理拆分这些路径以实现并行性。然后我有一个MapFunction,它将文件路径作为输入,打开/读取XML文件并对其进行解析,然后将有趣的提取数据位发送到下游。

另一种方法是使用现有的几种HadoopXmlInputFormat实现之一(例如,这是Mahout的一部分(。将HadoopInputFormat与Flink一起使用需要一些工作,但它是可行的。例如(未经测试!!(:

Job job = Job.getInstance();
FileInputFormat.addInputPath(job, new Path(inputDir));
HadoopInputFormat<LongWritable, Text> inputFormat = HadoopInputs.createHadoopInput(new XmlInputFormat(), LongWritable.class, Text.class, job);
Configuration parameters = new Configuration();
parameters.setBoolean("recursive.file.enumeration", true);
inputFormat.configure(parameters);
...
env.createInput(inputFormat);

相关内容

  • 没有找到相关文章

最新更新