我编写了一个Flink管道,它将数据流以parquet格式写入文件。我使用sinkTo
方法将输出写入文件。当应用程序启动时,我得到以下异常:
java.lang.RuntimeException: Could not look up the main(String[]) method from the class com.fk.logs.StreamingJob: org/apache/flink/api/connector/sink2/Sink
at org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:315)
at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:161)
at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)
at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
at org.apache.flink.client.cli.CliFrontend.buildProgram(CliFrontend.java:851)
at org.apache.flink.client.cli.CliFrontend.getPackagedProgram(CliFrontend.java:271)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:245)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/api/connector/sink2/Sink
at java.base/java.lang.Class.getDeclaredMethods0(Native Method)
at java.base/java.lang.Class.privateGetDeclaredMethods(Class.java:3166)
at java.base/java.lang.Class.getMethodsRecursive(Class.java:3307)
at java.base/java.lang.Class.getMethod0(Class.java:3293)
at java.base/java.lang.Class.getMethod(Class.java:2106)
at org.apache.flink.client.program.PackagedProgram.hasMainMethod(PackagedProgram.java:307)
... 10 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.api.connector.sink2.Sink
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:589)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:64)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
参考代码
DataStream<String> stream = env.fromSource(getSource(parser), WatermarkStrategy.noWatermarks(),
"event source").uid("log file event source").filter(f -> !f.isEmpty());
readLogFile(parser, stream)
.flatMap((value, out) -> value.lines()
.forEach(out::collect), TypeInformation.of(String.class))
.map(StreamingJob::parseLine).filter(Optional::isPresent)
.map(Optional::get, mapType)
.map(lm -> {
if (lm.containsKey("msg")) {
parseLine(lm.get("msg").toString()).ifPresent(mf -> lm.putAll(mf));
}
return lm;
}, mapType)
.keyBy(m -> m.computeIfAbsent("appid", k -> "unknownapp"))
.map(m -> newRecord(m))
.sinkTo(getSink());
使用Flink 1.15版本。
问题似乎是在1.15版本我下载并运行我的应用程序。我已经清理安装和重新安装,以正常运行。