我正在尝试 flink,并在 flink 集群(在 kubernetes 上)部署作业时遇到异常。
设置 Flink - 1.4.2斯卡拉 - 2.11.12
Java 8 SDK
集群上的 Flink docker 镜像 - flink:1.4.2-scala_2.11
-高山
SBT 文件
ThisBuild / resolvers ++= Seq(
"Apache Development Snapshot Repository" at "https://repository.apache.org/content/repositories/snapshots/",
Resolver.mavenLocal
)
name := "streamingTest1"
version := "0.1-SNAPSHOT"
organization := "com.example"
ThisBuild / scalaVersion := "2.11.12"
val flinkVersion = "1.4.2"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion % "provided"
)
lazy val root = (project in file(".")).
settings(
libraryDependencies ++= flinkDependencies
)
libraryDependencies += "com.microsoft.azure" % "azure-eventhubs" % "1.0.0"
libraryDependencies += "com.google.code.gson" % "gson" % "2.3.1"
excludeDependencies ++= Seq(
ExclusionRule("org.ow2.asm", "*")
)
assembly / mainClass := Some("com.example.Job")
// make run command include the provided dependencies
Compile / run := Defaults.runTask(Compile / fullClasspath,
Compile / run / mainClass,
Compile / run / runner
).evaluated
// exclude Scala library from assembly
assembly / assemblyOption := (assembly / assemblyOption).value.copy(includeScala = false)
我可以在 IDEA 中的机器上或通过 sbt cli 本地运行作业,我可以看到预期的输出或可以将输出发送到我的接收器。
在部署在 kubernetes 集群上的 flink 服务器上运行时,我看到以下异常。
错误
java.util.concurrent.CompletionException: org.apache.flink.util.FlinkException: Could not run the jar.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:90)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not run the jar.
... 9 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The program caused an error:
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:93)
at org.apache.flink.client.program.ClusterClient.getOptimizedPlan(ClusterClient.java:334)
at org.apache.flink.runtime.webmonitor.handlers.JarActionHandler.getJobGraphAndClassLoader(JarActionHandler.java:87)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleJsonRequest$0(JarRunHandler.java:69)
... 8 more
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema
at com.example.Job$.main(Job.scala:126)
at com.example.Job.main(Job.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:525)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:417)
at org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
... 11 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 20 more
更多背景
我的项目有一个用于 flink 作业的 scala 类,它还在与 EventHub 接口的连接器的单独目录中具有 java 代码。我的主要 scala 代码不依赖于 KeyedDeserializationSchema,但连接器有。KeyedDeserializationSchema很可能来自我的sbt文件中包含的kafka连接器依赖项。在群集上运行此作业时,是否有任何原因 kafka 连接器不可用?
如何调试服务器上正在加载的 kafka 连接器版本?有没有其他打包方式可以强制 flink 加载 kafka 连接器?
正如我在发布此问题后很快意识到的那样,修复方法是删除 kafka 连接器的"提供"。
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided",
"org.apache.flink" %% "flink-streaming-java" % flinkVersion % "provided",
"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion
)