Apache Flink with Hadoop HDFS: wrong FS expected file:///



我有一个自定义Flink Sink,它正在写入HDFS,使用以下代码实例化FileSystem对象。

val path = new Path("/path/to/one/hdfs/dir")
val hadoopJob   = Job.getInstance
val hadoopConf   = hadoopJob.getConfiguration
val fs = FileSystem.get(hadoopConf)
val os = fs.create(path)

我设置了属性fs.hdfs.hadoopconf在flink配置文件中指向我有hadoop配置文件的目录。

在core-site.xml中我定义了属性fs.defaultFS如下所示。

<property>
<name>fs.defaultFS</name>
<value>hdfs://hostname:port</value>
</property>

它失败是因为它正在实例化一个对象类型LocalFileSystem,而不是DistributedFileSystem。以下是我得到的例外:

. lang。IllegalArgumentException: Wrong FS: hdfs://compute-0-0:9000/esteban。Collado/kmers,期望:file:///org.apache.hadoop.fs.FileSystem.checkPath (FileSystem.java: 665)org.apache.hadoop.fs.RawLocalFileSystem.pathToFile (RawLocalFileSystem.java: 86)org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission (RawLocalFileSystem.java: 542)org.apache.hadoop.fs.RawLocalFileSystem.mkdirs (RawLocalFileSystem.java: 528)

谁能给我一些提示关于可能的问题?

谢谢,

可能您可以查看filessystem .get(path)方法,通过path来识别最终的文件系统[1]: https://i.stack.imgur.com/puPzC.png