在HDFS上闪烁写入会产生空文件



我有一个flink作业,它使用TextOutputFormat将数据写入目标。代码如下:

   String basePath = "/Users/me/out";
   // String basePath = "hdfs://10.199.200.204:9000/data";
   // ensure we have a format for this.
   TextOutputFormat<String> format = new TextOutputFormat<>(new Path(basePath, selection + "/" + uid));
   StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
    format.configure(GlobalConfiguration.getConfiguration());
    format.open(context.getIndexOfThisSubtask(), context.getNumberOfParallelSubtasks());
   // then serialize and write.
   String record = serializationFunction.map(value);
   log.info("Writing " + record);
   format.writeRecord(record);

当使用普通文件系统上的路径作为目标时,这一点非常好。然而,当我将基本路径更改为hdfs位置时,它就不再像预期的那样工作了。实际情况是,输出文件实际上是在HDFS上创建的,但它的大小为零字节。我在通话中没有遇到任何异常。

我使用的是Hadoop2.6.0和Flink 0.10.1。使用命令行工具(hadoop fs -put ...)将文件复制到hdfs是可行的,所以我认为我可以排除Hadoop的一些错误配置。此外,我启动了Wireshark,看到数据被传输到Hadoop服务器,所以在实际编写之前,我是否需要提交一些东西?

为了将结果刷新到HDFS,您必须在完成记录写入后调用TextOutputFormatclose方法。

// do writing
while (some condition) {
    format.writeRecord(record);
}
// finished writing
format.close();

我发现了它发生的原因。实际上有两个原因:

  1. Till Rohrmann指出,输出格式没有被刷新。由于我在流媒体作业中使用该格式,因此无法关闭该格式。我求助于编写我自己的可以刷新的格式:

    public class MyTextOutputFormat<T> extends TextOutputFormat<T> {
        public MyTextOutputFormat(Path outputPath) {
            super(outputPath);
        }
        public MyTextOutputFormat(Path outputPath, String charset) {
            super(outputPath, charset);
        }
        // added a custom flush method here.
        public void flush() throws IOException {
            stream.flush();
        }
    }
    
  2. 我在虚拟机客户机中运行HDFS,并从虚拟机主机连接到它。Flink的HDFS客户端默认使用数据节点的IP地址连接到数据节点。然而,数据节点的IP地址被报告为127.0.0.1。所以flink尝试连接到127.0.0.1,当然主机系统中没有HDFS数据节点在那里运行。然而,这只是在我添加手动冲洗操作后才显示出来的。为了解决这个问题,我不得不改变两件事:

    • 在VM来宾内部,修改$HADOOP_HOME/etc/hadoop/hdfs-site.xml并添加

      <property>
          <name>dfs.datanode.hostname</name>
          <value>10.199.200.204</value> <!-- IP of my VM guest -->
      </property>
      

      此更改使namenode报告数据节点的正确可路由主机名。这实际上是一个没有记录的环境,但似乎有效。

    • 在flink实际运行的系统上,我必须在文件夹(例如/home/me/conf)中创建一个hdfs-site.xml,然后必须设置一个指向/home/me/conf的环境变量HADOOP_CONF_DIR。该文件包含以下内容:

      <?xml version="1.0" encoding="UTF-8"?>
      <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
      <configuration>
          <property>
              <name>dfs.client.use.datanode.hostname</name>
              <value>true</value>
          </property>
      </configuration>
      

      这个更改指示hadoop客户端使用主机名而不是ip地址来连接到数据节点。在这些更改之后,我的数据被正确地写入HDFS。

相关内容

  • 没有找到相关文章

最新更新