Apache Flink:在配置文件中未指定execution.target



我用Flink框架写了一个程序,它有一个自定义的数据源。自定义数据源通过网络套接字侦听加密货币股票价格流。主要方法如下:

public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment streamExecutionEnvironment = new StreamExecutionEnvironment();
DataStream<TickerPrice> highestBidDataStream = streamExecutionEnvironment
.addSource(new GdaxSourceFunction())
.keyBy((tickerPrice) -> {
return tickerPrice.getExchange() + "_" + tickerPrice.getFromCurrency() + "_" + tickerPrice.getToCurrency();
}).window(TumblingEventTimeWindows.of(Time.seconds(1))).maxBy("highestBid");

highestBidDataStream.print();
streamExecutionEnvironment.execute("Gdax Highest bid window calculator");
}
}

我以这种方式在本地集群中运行程序:

./start-cluster.sh
./flink run --target local /path/to/Projects/CryptoFlink/build/libs/CryptoFlink-1.0-SNAPSHOT.jar

我用这个build.gradle构建了它,它几乎是apache flink quickstart:的精确副本

buildscript {
repositories {
jcenter() // this applies only to the Gradle 'Shadow' plugin
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
}
}

plugins {
id 'java'
id 'application'
id 'com.github.johnrengelman.shadow' version '6.1.0'
}

// artifact properties
group = 'com.cryptoflink'
version = '0.1-SNAPSHOT'
mainClassName = 'com.cryptoflink.main.Main'
description = """Cryptocurrency Flink Program"""

ext {
javaVersion = '1.8'
flinkVersion = '1.11.2'
scalaBinaryVersion = '2.12'
slf4jVersion = '1.7.15'
log4jVersion = '2.12.1'
}

repositories {
mavenCentral()
maven { url "https://repository.apache.org/content/repositories/snapshots/" }
}

configurations {
flinkShadowJar // dependencies which go into the shadowJar

// always exclude these (also from transitive dependencies) since they are provided by Flink
flinkShadowJar.exclude group: 'org.apache.flink', module: 'force-shading'
flinkShadowJar.exclude group: 'com.google.code.findbugs', module: 'jsr305'
flinkShadowJar.exclude group: 'org.slf4j'
flinkShadowJar.exclude group: 'org.apache.logging.log4j'
}

def flinkVersion = '1.11.2'
dependencies {

testCompile group: 'junit', name: 'junit', version: '4.12'

flinkShadowJar 'javax.websocket:javax.websocket-api:1.1'

flinkShadowJar 'com.fasterxml.jackson.core:jackson-databind:2.11.1'
flinkShadowJar group: 'com.fasterxml.jackson.core', name: 'jackson-core', version: '2.9.4'
flinkShadowJar group: 'com.fasterxml.jackson.core', name: 'jackson-annotations', version: '2.9.4'
flinkShadowJar group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-hibernate5', version: '2.9.4'

compile "org.apache.flink:flink-streaming-java_${scalaBinaryVersion}:${flinkVersion}"

flinkShadowJar 'org.glassfish.tyrus:tyrus-server:1.12'
flinkShadowJar 'org.glassfish.tyrus:tyrus-container-grizzly-server:1.12'

compile "org.apache.logging.log4j:log4j-api:${log4jVersion}"
compile "org.apache.logging.log4j:log4j-core:${log4jVersion}"
compile "org.apache.logging.log4j:log4j-slf4j-impl:${log4jVersion}"
compile "org.slf4j:slf4j-log4j12:${slf4jVersion}"
}

sourceSets {
main.compileClasspath += configurations.flinkShadowJar
main.runtimeClasspath += configurations.flinkShadowJar
test.compileClasspath += configurations.flinkShadowJar
test.runtimeClasspath += configurations.flinkShadowJar

javadoc.classpath += configurations.flinkShadowJar
}

run.classpath = sourceSets.main.runtimeClasspath

jar {
manifest {
attributes 'Built-By': System.getProperty('user.name'),
'Build-Jdk': System.getProperty('java.version')
}
}

shadowJar {
configurations = [project.configurations.flinkShadowJar]
}

但我总是遇到这样的错误:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No execution.target specified in your configuration file.
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:30)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: java.lang.NullPointerException: No execution.target specified in your configuration file.
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1798)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1711)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1697)
at com.cryptoflink.main.Main.main(Main.java:26)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:567)

Flink教程没有提到任何这样的配置文件,它需要在项目中才能运行。然而,一个教程说"--目标本地";到run命令应该将execution.target设置为本地集群。但这不起作用。

我试过:

  1. 将execution.target添加到flink-1.1.2/conf/flink-conf.yaml
  2. 在run命令中指定--target local,--target remote
  3. 运行flink示例。这些工作,但我不能通过查看JAR来判断我的JAR和示例JAR之间的关键区别是什么
  4. 添加带有execution.target:local的main/resources/conf/flink-conf.yaml和main/resources/flink-conf.yaml

版本:

JDK 1.8闪烁1.11.2

解决方案是访问StreamExecutionEnvironment,而不是创建一个新的:

StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

感谢Flink电子邮件列表中的Kostas Kloudas提供此解决方案。

最新更新