Flink Hadoop实现问题 - 找不到方案'hdfs'的文件系统实现



我正在努力集成hdfs来闪烁。

Scala 二进制版本:2.12,

Flink(集群(版本:1.10.1

这是HADOOP_CONF_DIR;

HDFS的配置在这里;

此配置和HADOOP_CONF_DIR任务管理器中也相同。

绒球.xml;

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-filesystem_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-compatibility_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-parquet_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.10.0</version>
</dependency>
</dependencies>

我试图从 hdfs 获取镶木地板文件,我的示例代码就在那里;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
Types.MessageTypeBuilder builder = Types.buildMessage();
MessageType messageType = builder
.required(INT64).named("column1")
.required(BINARY).as(UTF8).named("column2")
.required(INT64).named("column3")
.required(INT64).named("column4")
.required(BINARY).as(UTF8).named("column5")
.required(BINARY).named("column6")
.named("AppendTest");
ParquetTableSource parquetTableSource = ParquetTableSource.builder()
.path("hdfs://hdfs:8020/historic_data/data.parquet")
.withConfiguration(hadoopConf)
.forParquetSchema(messageType)
.build();
tEnv.registerTableSource("datatable", parquetTableSource);
Table table = tEnv.sqlQuery("select * from datatable");
DataSet<Row> tempDataSet = tEnv.toDataSet(table, Row.class);
tempDataSet.print();
env.execute("Job name - short desc.");

错误在这里;

Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not set up JobManager
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:152)
at org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:84)
at org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$6(Dispatcher.java:381)
at org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
... 7 more
Caused by: org.apache.flink.runtime.JobException: Creating the input splits caused an error: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:271)
at org.apache.flink.runtime.executiongraph.ExecutionGraph.attachJobGraph(ExecutionGraph.java:807)
at org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.createExecutionGraph(SchedulerBase.java:256)
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:228)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:216)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:120)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:105)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:278)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:266)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:98)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:40)
at org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:146)
... 10 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 'hdfs'. The scheme is not directly supported by Flink and no Hadoop file system to support this scheme could be loaded.
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:450)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:362)
at org.apache.flink.core.fs.Path.getFileSystem(Path.java:298)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:587)
at org.apache.flink.api.common.io.FileInputFormat.createInputSplits(FileInputFormat.java:62)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.<init>(ExecutionJobVertex.java:257)
... 22 more
Caused by: org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Hadoop is not in the classpath/dependencies.
at org.apache.flink.core.fs.UnsupportedSchemeFactory.create(UnsupportedSchemeFactory.java:58)
at org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:446)
... 27 more

这是奇怪的部分,因为你在lib文件夹下看到Hadoop uber jar。

这就是我顺服约伯的方式;

docker exec -it jobmanager env HADOOP_CONF_DIR=/tmp/hadoopconf flink run -C file:///opt/flink/lib/flink-shaded-hadoop-2-uber-2.7.5-10.0.jar -d/tmp/core-batch-1.0.0.jar

我也尝试使用 flink UI 推送作业,但结果是一样的。

任何帮助将不胜感激。

谢谢。

@Aykut,我不确定这有多大帮助,但我最近一直在测试通过我从GitHub上的Ambari Flink服务创建的自定义ambari服务交付给HDP3的Flink 1.12。 当我将 Flink 安装到安装了 yarn/hdfs 的基础 ambari 集群中时,一切都可以完美地与所有依赖项配合使用。 一旦 yarn flink 应用程序在 yarn 集群中运行,我的其余应用程序测试效果很好。 参考纱线上的眨眼。

以下是我的 Flink 用户环境变量:

export HADOOP_CONF_DIR=/etc/hadoop/conf; export HADOOP_CLASSPATH=/usr/hdp/3.1.4.0-315/hadoop/conf:/usr/hdp/3.1.4.0-315/hadoop/lib/*:/usr/hdp/3.1.4.0-315/hadoop/.//*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/./:/usr/hdp/3.1.4.0-315/hadoop-hdfs/lib/*:/usr/hdp/3.1.4.0-315/hadoop-hdfs/.//*:/usr/hdp/3.1.4.0-315/hadoop-mapreduce/lib/*:/usr/hdp/3.1.4.0-315/hadoop-mapreduce/.//*:/usr/hdp/3.1.4.0-315/hadoop-yarn/./:/usr/hdp/3.1.4.0-315/hadoop-yarn/lib/*:/usr/hdp/3.1.4.0-315/hadoop-yarn/.//*:/usr/hdp/3.1.4.0-315/tez/*:/usr/hdp/3.1.4.0-315/tez/lib/*:/usr/hdp/3.1.4.0-315/tez/conf:/usr/hdp/3.1.4.0-315/tez/conf_llap:/usr/hdp/3.1.4.0-315/tez/doc:/usr/hdp/3.1.4.0-315/tez/hadoop-shim-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/hadoop-shim-2.8-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib:/usr/hdp/3.1.4.0-315/tez/man:/usr/hdp/3.1.4.0-315/tez/tez-api-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-common-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-dag-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-examples-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-history-parser-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-javadoc-tools-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-job-analyzer-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-mapreduce-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-protobuf-history-plugin-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-runtime-internals-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-runtime-library-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-tests-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-cache-plugin-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-with-acls-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/tez-yarn-timeline-history-with-fs-0.9.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/ui:/usr/hdp/3.1.4.0-315/tez/lib/async-http-client-1.9.40.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-cli-1.2.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-codec-1.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-collections-3.2.2.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-collections4-4.1.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-io-2.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-lang-2.6.jar:/usr/hdp/3.1.4.0-315/tez/lib/commons-math3-3.1.1.jar:/usr/hdp/3.1.4.0-315/tez/lib/gcs-connector-1.9.10.3.1.4.0-315-shaded.jar:/usr/hdp/3.1.4.0-315/tez/lib/guava-28.0-jre.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-aws-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-azure-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-azure-datalake-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-hdfs-client-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-mapreduce-client-common-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-mapreduce-client-core-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/hadoop-yarn-server-timeline-pluginstorage-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/tez/lib/jersey-client-1.19.jar:/usr/hdp/3.1.4.0-315/tez/lib/jersey-json-1.19.jar:/usr/hdp/3.1.4.0-315/tez/lib/jettison-1.3.4.jar:/usr/hdp/3.1.4.0-315/tez/lib/jetty-server-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/tez/lib/jetty-util-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/tez/lib/jsr305-3.0.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/metrics-core-3.1.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/protobuf-java-2.5.0.jar:/usr/hdp/3.1.4.0-315/tez/lib/RoaringBitmap-0.4.9.jar:/usr/hdp/3.1.4.0-315/tez/lib/servlet-api-2.5.jar:/usr/hdp/3.1.4.0-315/tez/lib/slf4j-api-1.7.10.jar:/usr/hdp/3.1.4.0-315/tez/lib/tez.tar.gz;

这是我的执行命令:

/opt/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/bin/flink run javaProgram.jar

以下是示例 java 代码(通过 Flink 将 s3 文件摄取到 hdfs(:

package org.myorg.quickstart;
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class S3FlinkIngest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = streamExecutionEnvironment.readTextFile("s3a://s3Bucket/input.xml");
dataStream.writeAsText("hdfs://hadoopHost:8020/user/flink/ingest.xml", org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);;
streamExecutionEnvironment.execute("S3 Flink Ingest");
} }

最新更新