Flink (1.11.2) - 尽管设置了正确的插件,但找不到 S3 的实现。使用 JDK11 和 Scala 2.12.11



我在Linux中使用Docker运行一个带有单个节点的Flink独立集群。我已经在生产中使用Flink 1.10.0和JDK8运行了一段时间的早期版本,我能够在那里正常运行S3。现在我正在尝试更新到一个新版本,使用本地S3实现在我的开发机器上运行Docker。无论我尝试什么,这个错误都会不断出现:

org.apache.flink.core.fs.UnsupportedFileSystemSchemeException: Could not find a file system implementation for scheme 's3'.

S3方案似乎没有映射到适当的类。我确信Flink正在挑选合适的插件。我有以下依赖项:

val testDependencies = Seq(
"org.scalatest" %% "scalatest" % "3.2.0" % "test"
)
val miscDependencies = Seq(
"com.github.tototoshi" %% "scala-csv" % "1.3.6",
"org.lz4" % "lz4-java" % "1.5.1",
"org.json4s" %% "json4s-jackson" % "3.6.1",
"org.apache.hadoop" % "hadoop-common" % "3.2.1",
"redis.clients" % "jedis" % "2.9.0",
"com.googlecode.plist" % "dd-plist" % "1.21",
"com.couchbase.client" % "java-client" % "2.7.14",
"org.apache.parquet" % "parquet-avro" % "1.11.1",
)
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" % "flink-s3-fs-hadoop" % flinkVersion % "provided",
"org.apache.flink" % "flink-metrics-dropwizard" % flinkVersion,
"org.apache.flink" % "flink-formats" % flinkVersion pomOnly(),
"org.apache.flink" % "flink-compress" % flinkVersion,
"org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion,
"org.apache.flink" %% "flink-clients" % flinkVersion,
"org.apache.flink" %% "flink-parquet" % flinkVersion
)

我确认我正在严格遵守文件。

经过一段时间的努力,我终于解决了这个问题。我将把我的解决方案留在这里,以防有人遇到同样的问题。

一旦jobmanager和taskmanager启动,就会检测到插件类,例如S3文件系统工厂,但是,它们并没有被加载。在我的设置中,一旦作业启动,就必须动态加载类。你可以在这里找到更多关于Flink如何加载类的信息。

正如这里所解释的,加载类的提示是由作业jar中META-INF/services中存在的文件给出的。要使S3插件工作,您需要有以下文件:

META-INF/services/org.apache.flink.core.fs.FileSystemFactory

其中包含Flink应作为作业依赖项动态加载的每个类的一行。例如:

org.apache.flink.fs.s3hadoop.S3FileSystemFactory
org.apache.flink.fs.s3hadoop.S3AFileSystemFactory

我正在使用sbt汇编来创建一个远JAR。在我的项目依赖项中,我将flink-s3-fs-hadoop作为一个提供的依赖项,这阻止了正确的服务文件被包括在内。一旦我删除了那个限定符,就创建了正确的服务,一切都正常了。

最新更新