我有一个Java应用程序,其中我正在使用Flink Api
。因此,基本上我要使用该代码的方法是创建两个具有很少记录的数据集,然后将其注册为两个表以及必要的字段。
DataSet<Company> comp = env.fromElements(
new Company("Aux", 1),
new Company("Comp2", 2),
new Company("Comp3", 3));
DataSet<Employee> emp = env.fromElements(
new Employee("Kula", 1),
new Employee("Ish", 1),
new Employee("Kula", 3));
tEnv.registerDataSet("Employee", emp, "name, empId");
tEnv.registerDataSet("Company", comp, "cName, empId");
,然后我试图使用Table API
加入这两个表:
Table anotherJoin = tEnv.sql("SELECT Employee.name, Employee.empId, Company.cName FROM " +
"Employee RIGHT JOIN Company on Employee.empId = Company.empId");
我只是在控制台上打印结果。这在我的机器上完美地在本地。我通过将maven-shade-plugin
与依赖关系一起创建了fat-jar
,并且正在尝试在AWS Lambda
中执行它。
所以当我尝试在那里执行它时,我会被以下例外抛弃(我只发布前几行(:
reference.conf @ file:/var/task/reference.conf:804:无法解决 替换为一个值:$ {akka.stream.materializer}: com.typesafe.config.configexception $未解决的订阅 com.typesafe.config.configexception $未解决的申请: Referent.conf @ file:/var/task/reference.conf:804:无法解析 替换为一个值:$ {akka.stream.materializer} at com.typesafe.config.impl.configreference.resolvesubstitutions(configreference.java:111( 在 com.typesafe.config.impl.resolvecontext.realresolve(resolvecontext.java:179( 在 com.typesafe.config.impl.resolvecontext.resolve(resolvecontext.java:142(
我在lambda执行罐子之前提取了罐子,碰巧看到所有依赖项都在那里。我不知道它在哪里出错?
任何帮助都可以不胜感激。
您需要在pom-> maven -shaded -plugin->配置部分中添加此代码:
<transformers>
<!-- append default configs -->
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
最终能够解决这个问题,这是我的POM中的一些主要版本问题。然后,我将所有依赖项降级到Flink 1.3.2
,并在shade
插件中添加了<relocations>
。现在起作用。我正在附上整个POM,以便有一天可以帮助某人:
<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.ink.FlinkLambdaTest.FlinkToLambda</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
</transformers>
<relocations>
<relocation>
<pattern>org.codehaus.plexus.util</pattern>
<shadedPattern>org.shaded.plexus.util</shadedPattern>
<excludes>
<exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude>
<exclude>org.codehaus.plexus.util.xml.pull.*</exclude>
</excludes>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.10</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.10</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.10</artifactId>
<version>1.3.2</version>
</dependency>
</dependencies>
确保使用您的更改主类。
gradle:
buildscript {
repositories {
maven {
url 'https://plugins.gradle.org/m2/'
}
}
dependencies {
classpath 'com.github.jengelman.gradle.plugins:shadow:2.0.4'
}
}
apply plugin: 'com.github.johnrengelman.shadow'
然后,在汇总akka依赖的项目中:
shadowJar {
include 'reference.conf'
}
使用此处,您应该能够正常构建(例如,调用组装和构建任务(。
Intellij Idea也支持这一点。ftw!它还支持下面的SBT结构。
lazy val assemblySettings = Seq(
assemblyJarName in assembly := name.value + ".jar",
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case _ => MergeStrategy.first
})
对于SBT用户,这对我有用。
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs@_*) => MergeStrategy.discard
case "reference.conf" => MergeStrategy.concat
case x => MergeStrategy.first
}
}
这可能会迟到一些,但这是Flink网站上有关此主题的答案。
还排除了有关Meta-Inf文件夹的信息,这可能会在使用JAR时会引起安全问题。
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>my.programs.main.clazz</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>