所以我必须检索存储在HDFS中的文件的内容并对其执行某些分析。
问题是,我什至无法读取文件并将其内容写入本地文件系统中的另一个文本文件。(我是 Flink 的新手,这只是一个测试,以确保我正确读取文件)
HDFS中的文件是一个纯文本文件。这是我的代码:
public class readFromHdfs {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> lines = env.readTextFile("hdfs://localhost:9000//test/testfile0.txt");
lines.writeAsText("/tmp/hdfs_file.txt");
env.execute("File read from HDFS");
}
}
运行后/tmp 中没有输出。
这是一个非常简单的代码,我不确定它是否有问题,或者我只是做错了其他事情。正如我所说,我对 Flink 完全陌生
此外,作业在 Web 仪表板中显示为失败。这是 flink 日志的集合: https://pastebin.com/rvkXPGHU
提前致谢
编辑:我通过增加任务槽的数量解决了这个问题。网络仪表板显示了一个可用的任务槽,它并没有抱怨没有足够的槽位,所以我认为不可能是这样。
无论如何,写成文本它并没有像我预期的那样真正工作。我从testfile0中读取内容.txt好吧,但它没有将它们写入hdfs_file.txt。相反,它以该名称创建一个目录,并在其中 8 个文本文件,其中 6 个完全为空。另外两个包含 testfile0.txt(其中大部分在 1.txt 中,最后一个块在 2.txt 中)。
虽然这并不重要,因为文件的内容被正确存储在 DataSet 中,所以我可以继续对数据进行分析。
它按预期工作 - 您已将完整作业的并行度(因此也设置为输出格式)为 8,因此每个插槽都会创建自己的文件(因为您可能知道并发写入单个文件是不安全的)。如果只需要 1 个输出文件,则应writeAsText(...).setParalellis(1)
覆盖全局并行属性。
如果你想在本地文件系统而不是HDFS中获取输出,你应该在path中明确设置"file://"协议,因为对于Hadoop,flink默认看起来是"hdfs://"。