我有一个flink作业,它使用TextOutputFormat将数据写入目标。代码如下:
String basePath = "/Users/me/out";
// String basePath = "hdfs://10.199.200.204:9000/data";
// ensure we have a format for this.
TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection + "/" + uid));
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
format.configure(GlobalConfiguration.getConfiguration());
format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
// then serialize and write.
String record = serializationFunction.map(value);
log.info("Writing " + record);
format.writeRecord(record);
当使用普通文件系统上的路径作为目标时,这一点非常好。然而,当我将基本路径更改为hdfs位置时,它就不再像预期的那样工作了。实际情况是,输出文件实际上是在HDFS上创建的,但它的大小为零字节。我在通话中没有遇到任何异常。
我使用的是Hadoop2.6.0和Flink 0.10.1。使用命令行工具(hadoop fs -put ...
)将文件复制到hdfs是可行的,所以我认为我可以排除Hadoop的一些错误配置。此外,我启动了Wireshark,看到数据被传输到Hadoop服务器,所以在实际编写之前,我是否需要提交一些东西?
为了将结果刷新到HDFS,您必须在完成记录写入后调用TextOutputFormat
的close
方法。
// do writing
while (some condition) {
format.writeRecord(record);
}
// finished writing
format.close();
我发现了它发生的原因。实际上有两个原因:
-
Till Rohrmann指出,输出格式没有被刷新。由于我在流媒体作业中使用该格式,因此无法关闭该格式。我求助于编写我自己的可以刷新的格式:
public class MyTextOutputFormat<T> extends TextOutputFormat<T> { public MyTextOutputFormat(Path outputPath) { super(outputPath); } public MyTextOutputFormat(Path outputPath, String charset) { super(outputPath, charset); } // added a custom flush method here. public void flush() throws IOException { stream.flush(); } }
-
我在虚拟机客户机中运行HDFS,并从虚拟机主机连接到它。Flink的HDFS客户端默认使用数据节点的IP地址连接到数据节点。然而,数据节点的IP地址被报告为
127.0.0.1
。所以flink尝试连接到127.0.0.1
,当然主机系统中没有HDFS数据节点在那里运行。然而,这只是在我添加手动冲洗操作后才显示出来的。为了解决这个问题,我不得不改变两件事:-
在VM来宾内部,修改
$HADOOP_HOME/etc/hadoop/hdfs-site.xml
并添加<property> <name>dfs.datanode.hostname</name> <value>10.199.200.204</value> <!-- IP of my VM guest --> </property>
此更改使namenode报告数据节点的正确可路由主机名。这实际上是一个没有记录的环境,但似乎有效。
-
在flink实际运行的系统上,我必须在文件夹(例如
/home/me/conf
)中创建一个hdfs-site.xml
,然后必须设置一个指向/home/me/conf
的环境变量HADOOP_CONF_DIR
。该文件包含以下内容:<?xml version="1.0" encoding="UTF-8"?> <?xml-stylesheet type="text/xsl" href="configuration.xsl"?> <configuration> <property> <name>dfs.client.use.datanode.hostname</name> <value>true</value> </property> </configuration>
这个更改指示hadoop客户端使用主机名而不是ip地址来连接到数据节点。在这些更改之后,我的数据被正确地写入HDFS。
-