flink的版本尝试:1.4.0,1.4.1,1.4.2
当我尝试制作此简单的Flink应用程序
时val env: StreamExecutionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment
env.fromElements("a", "b", "c").addSink(new BucketingSink[String]("file:///Users/joshlemer/projects/my-project/target/output"))
我得到以下运行时异常:
Exception in thread "main" java.lang.NoClassDefFoundError: Lorg/apache/hadoop/fs/FileSystem;
at java.lang.Class.getDeclaredFields0(Native Method)
at java.lang.Class.privateGetDeclaredFields(Class.java:2583)
at java.lang.Class.getDeclaredFields(Class.java:1916)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:72)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1550)
at org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:184)
at org.apache.flink.streaming.api.datastream.DataStream.addSink(DataStream.java:1134)
at org.apache.flink.streaming.api.scala.DataStream.addSink(DataStream.scala:1036)
at com.company.project.Job$.run(Job.scala:52)
at com.company.project.Job$.main(Job.scala:28)
at com.company.project.Job.main(Job.scala)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.fs.FileSystem
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
这个,即使我可以用dataStream.writeAsText(...)
写入文本文件。
我的build.sbt也很典型:
val flinkVersion = "1.4.2"
val flinkDependencies =
Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-statebackend-rocksdb" % flinkVersion,
"org.apache.flink" %% "flink-connector-kafka-0.11" % flinkVersion,
"org.apache.flink" %% "flink-connector-filesystem" % flinkVersion,
"org.apache.flink" %% "flink-test-utils" % flinkVersion % "test",
"org.apache.flink" % "flink-test-utils-junit" % flinkVersion % "test"
)
使用额外的idea.sbt
,如Flink建议的Intellij用户
lazy val mainRunner = project.in(file("mainRunner")).dependsOn(RootProject(file("."))).settings(
// we set all provided dependencies to none, so that they are included in the classpath of mainRunner
libraryDependencies := (libraryDependencies in RootProject(file("."))).value.map{
module =>
if (module.configurations.equals(Some("provided"))) {
module.copy(configurations = None)
} else {
module
}
}
)
我用来运行该应用程序的方法(Mainrunner设置为应用程序类Path(。
我很困惑为什么会发生这种情况,特别是为什么包装以"洛格"而不是" org"开头?
谢谢!
从1.4发行说明:
从版本1.4开始,Flink可以在类路径中没有任何Hadoop依赖项运行。除了简单地运行没有Hadoop的情况外,这还使Flink可以动态使用Class Path中可用的Hadoop版本。
例如,您可以下载无hadoop的flink版本,但使用它在任何受支持的纱线上运行,而Flink会动态使用YARN的Hadoop依赖项。
这也意味着,如果您使用与HDF的连接器(例如bucketingsink或rollingsink(,则现在必须确保您要么使用捆绑的hadoop依赖项的flink分布,要么确保在构建JAR时还包括Hadoop依赖关系为您的应用程序申请。