Flink:找不到指定执行的兼容工厂.target(=local)



>我最近从 1.9.0 更新到 flink 1.10.0,并在尝试在本地执行作业时开始收到此错误。令人惊讶的是,它在 IDE 中工作正常。只有当我尝试从命令行(java -jar(运行可执行jar时,我才收到此错误。

这里它说要添加一个依赖项,但我已经有了。有什么想法吗?

供参考,我有:

"org.apache.flink" %% "flink-scala" "1.10.0",
"org.apache.flink" %% "flink-streaming-scala" % "1.10.0",
"org.apache.flink" %% "flink-connector-kafka" % "1.10.0",

-

Exception in thread "main" java.lang.NullPointerException: Cannot find compatible factory for specified execution.target (=local)
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:104)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1726)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1634)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1602)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:667)
at workflow.task.engineTask.DERFTask.execute(DERFTask.scala:146)

编辑:我做了一些调试,从命令行(作为 jar(运行时,"迭代器工厂"似乎为空,而不是从 IDE 运行。因此,它永远不会进入 while 循环。奇怪。。

public PipelineExecutorFactory getExecutorFactory(Configuration configuration) {
Preconditions.checkNotNull(configuration);
List<PipelineExecutorFactory> compatibleFactories = new ArrayList();
Iterator factories = defaultLoader.iterator();
while(factories.hasNext()) {
try {
PipelineExecutorFactory factory = (PipelineExecutorFactory)factories.next();
if (factory != null && factory.isCompatibleWith(configuration)) {
compatibleFactories.add(factory);
}
} catch (Throwable var5) {
if (!(var5.getCause() instanceof NoClassDefFoundError)) {
throw var5;
}
LOG.info("Could not load factory due to missing dependencies.");
}
}
if (compatibleFactories.size() > 1) {
String configStr = (String)configuration.toMap().entrySet().stream().map((e) -> {
return (String)e.getKey() + "=" + (String)e.getValue();
}).collect(Collectors.joining("n"));
throw new IllegalStateException("Multiple compatible client factories found for:n" + configStr + ".");
} else {
return compatibleFactories.isEmpty() ? null : (PipelineExecutorFactory)compatibleFactories.get(0);
}

看起来你缺少这个依赖关系:

"org.apache.flink" %% "flink-clients" % "1.10.0"

我做了一些调试,似乎"迭代器工厂"在从命令行(作为 jar(运行时是空的,而不是来自 IDE。因此,它永远不会进入 while 循环。奇怪。。

public PipelineExecutorFactory getExecutorFactory(Configuration configuration) {
Preconditions.checkNotNull(configuration);
List<PipelineExecutorFactory> compatibleFactories = new ArrayList();
Iterator factories = defaultLoader.iterator();
while(factories.hasNext()) {
try {
PipelineExecutorFactory factory = (PipelineExecutorFactory)factories.next();
if (factory != null && factory.isCompatibleWith(configuration)) {
compatibleFactories.add(factory);
}
} catch (Throwable var5) {
if (!(var5.getCause() instanceof NoClassDefFoundError)) {
throw var5;
}
LOG.info("Could not load factory due to missing dependencies.");
}
}
if (compatibleFactories.size() > 1) {
String configStr = (String)configuration.toMap().entrySet().stream().map((e) -> {
return (String)e.getKey() + "=" + (String)e.getValue();
}).collect(Collectors.joining("n"));
throw new IllegalStateException("Multiple compatible client factories found for:n" + configStr + ".");
} else {
return compatibleFactories.isEmpty() ? null : (PipelineExecutorFactory)compatibleFactories.get(0);
}

相关内容

  • 没有找到相关文章

最新更新