我正在尝试开发弗林克流的作业。这项工作应从Kafka主题中阅读。
我已经尝试在https://github.com/dataartisans/kafka-example/blob/master/src/src/main/java/com/com/dataartisans/readfromkafka.java
上更新该示例。我想使用Flink 1.4和Kafka 0.11。
当我尝试构建(maven)项目时,我会收到以下错误:
[ERROR] /quickstart/src/main/java/org/myorg/quickstart/StreamingJob.java:[20,66] cannot access org.apache.flink.api.common.serialization.DeserializationSchema
class file for org.apache.flink.api.common.serialization.DeserializationSchema not found
[INFO] 1 error
...
[ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.1:compile (default-compile) on project quickstart: Compilation failure
[ERROR] /quickstart/src/main/java/org/myorg/quickstart/StreamingJob.java:[20,66] cannot access org.apache.flink.api.common.serialization.DeserializationSchema
[ERROR] class file for org.apache.flink.api.common.serialization.DeserializationSchema not found
有什么想法如何解决此错误?到目前为止,我找不到解决方案。
文件
streamjob.java
package org.myorg.quickstart;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
public class StreamingJob {
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse user parameters
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer08<String>(parameterTool.getRequired("topic"), (KeyedDeserializationSchema) new JSONKeyValueDeserializationSchema(true), parameterTool.getProperties()));
// print() will write the contents of the stream to the TaskManager's standard out stream
// the rebelance call is causing a repartitioning of the data so that all machines
// see the messages (for example in cases when "num kafka partitions" < "num flink operators"
messageStream.rebalance().map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Kafka and Flink says: " + value;
}
}).print();
// execute program
env.execute("Flink Streaming Java API Skeleton");
}
}
pom.xml
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.myorg.quickstart</groupId>
<artifactId>quickstart</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<url>http://www.myorganization.org</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.3.0</flink.version>
<slf4j.version>1.7.7</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<scala.binary.version>2.10</scala.binary.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<!--
Execute "mvn clean package -Pbuild-jar"
to build a jar file out of this project!
How to use the Flink Quickstart pom:
a) Adding new dependencies:
You can add dependencies to the list below.
Please check if the maven-shade-plugin below is filtering out your dependency
and remove the exclude from there.
b) Build a jar for running on the cluster:
There are two options for creating a jar from this project
b.1) "mvn clean package" -> this will create a fat jar which contains all
dependencies necessary for running the jar created by this pom in a cluster.
The "maven-shade-plugin" excludes everything that is provided on a running Flink cluster.
b.2) "mvn clean package -Pbuild-jar" -> This will also create a fat-jar, but with much
nicer dependency exclusion handling. This approach is preferred and leads to
much cleaner jar files.
-->
<dependencies>
<!-- Apache Flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- explicitly add a standard loggin framework, as Flink does not have
a hard dependency on one specific framework by default -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.8_2.11</artifactId>
<version>1.4-SNAPSHOT</version>
</dependency>
</dependencies>
<profiles>
<profile>
<!-- Profile for packaging correct JAR files -->
<id>build-jar</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<!-- disable the exclusion rules -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes combine.self="override"></excludes>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
<build>
<plugins>
<!-- We use the maven-shade plugin to create a fat jar that contains all dependencies
except flink and its transitive dependencies. The resulting fat-jar can be executed
on a cluster. Change the value of Program-Class if your program entry point changes. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<!-- This list contains all dependencies of flink-dist
Everything else will be packaged into the fat-jar
-->
<exclude>org.apache.flink:flink-annotations</exclude>
<exclude>org.apache.flink:flink-shaded-hadoop2</exclude>
<exclude>org.apache.flink:flink-shaded-curator-recipes</exclude>
<exclude>org.apache.flink:flink-core</exclude>
<exclude>org.apache.flink:flink-java</exclude>
<exclude>org.apache.flink:flink-scala_2.10</exclude>
<exclude>org.apache.flink:flink-runtime_2.10</exclude>
<exclude>org.apache.flink:flink-optimizer_2.10</exclude>
<exclude>org.apache.flink:flink-clients_2.10</exclude>
<exclude>org.apache.flink:flink-avro_2.10</exclude>
<exclude>org.apache.flink:flink-examples-batch_2.10</exclude>
<exclude>org.apache.flink:flink-examples-streaming_2.10</exclude>
<exclude>org.apache.flink:flink-streaming-java_2.10</exclude>
<exclude>org.apache.flink:flink-streaming-scala_2.10</exclude>
<exclude>org.apache.flink:flink-scala-shell_2.10</exclude>
<exclude>org.apache.flink:flink-python</exclude>
<exclude>org.apache.flink:flink-metrics-core</exclude>
<exclude>org.apache.flink:flink-metrics-jmx</exclude>
<exclude>org.apache.flink:flink-statebackend-rocksdb_2.10</exclude>
<!-- Also exclude very big transitive dependencies of Flink
WARNING: You have to remove these excludes if your code relies on other
versions of these dependencies.
-->
<exclude>log4j:log4j</exclude>
<exclude>org.scala-lang:scala-library</exclude>
<exclude>org.scala-lang:scala-compiler</exclude>
<exclude>org.scala-lang:scala-reflect</exclude>
<exclude>com.data-artisans:flakka-actor_*</exclude>
<exclude>com.data-artisans:flakka-remote_*</exclude>
<exclude>com.data-artisans:flakka-slf4j_*</exclude>
<exclude>io.netty:netty-all</exclude>
<exclude>io.netty:netty</exclude>
<exclude>commons-fileupload:commons-fileupload</exclude>
<exclude>org.apache.avro:avro</exclude>
<exclude>commons-collections:commons-collections</exclude>
<exclude>org.codehaus.jackson:jackson-core-asl</exclude>
<exclude>org.codehaus.jackson:jackson-mapper-asl</exclude>
<exclude>com.thoughtworks.paranamer:paranamer</exclude>
<exclude>org.xerial.snappy:snappy-java</exclude>
<exclude>org.apache.commons:commons-compress</exclude>
<exclude>org.tukaani:xz</exclude>
<exclude>com.esotericsoftware.kryo:kryo</exclude>
<exclude>com.esotericsoftware.minlog:minlog</exclude>
<exclude>org.objenesis:objenesis</exclude>
<exclude>com.twitter:chill_*</exclude>
<exclude>com.twitter:chill-java</exclude>
<exclude>commons-lang:commons-lang</exclude>
<exclude>junit:junit</exclude>
<exclude>org.apache.commons:commons-lang3</exclude>
<exclude>org.slf4j:slf4j-api</exclude>
<exclude>org.slf4j:slf4j-log4j12</exclude>
<exclude>log4j:log4j</exclude>
<exclude>org.apache.commons:commons-math</exclude>
<exclude>org.apache.sling:org.apache.sling.commons.json</exclude>
<exclude>commons-logging:commons-logging</exclude>
<exclude>commons-codec:commons-codec</exclude>
<exclude>com.fasterxml.jackson.core:jackson-core</exclude>
<exclude>com.fasterxml.jackson.core:jackson-databind</exclude>
<exclude>com.fasterxml.jackson.core:jackson-annotations</exclude>
<exclude>stax:stax-api</exclude>
<exclude>com.typesafe:config</exclude>
<exclude>org.uncommons.maths:uncommons-maths</exclude>
<exclude>com.github.scopt:scopt_*</exclude>
<exclude>commons-io:commons-io</exclude>
<exclude>commons-cli:commons-cli</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>org.apache.flink:*</artifact>
<excludes>
<!-- exclude shaded google but include shaded curator -->
<exclude>org/apache/flink/shaded/com/**</exclude>
<exclude>web-docs/**</exclude>
</excludes>
</filter>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<!-- If you want to use ./bin/flink run <quickstart jar> uncomment the following lines.
This will add a Main-Class entry to the manifest file -->
<!--
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.myorg.quickstart.StreamingJob</mainClass>
</transformer>
</transformers>
-->
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.7</source> <!-- If you want to use Java 8, change this to "1.8" -->
<target>1.7</target> <!-- If you want to use Java 8, change this to "1.8" -->
</configuration>
</plugin>
</plugins>
<!-- If you want to use Java 8 Lambda Expressions uncomment the following lines -->
<!--
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerId>jdt</compilerId>
</configuration>
<dependencies>
<dependency>
<groupId>org.eclipse.tycho</groupId>
<artifactId>tycho-compiler-jdt</artifactId>
<version>0.21.0</version>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<versionRange>[2.4,)</versionRange>
<goals>
<goal>single</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
-->
</build>
</project>
我认为这是因为您试图根据pom.xml。
使用flink 1.3.0<flink.version>1.3.0</flink.version>
DeserializationChema在org.apache.flink.streaming.util.serialization中为1.3.0。不是它试图看的地方。应该能够将版本更改为pom.xml中的1.4.1,为
<flink.version>1.4.1</flink.version>
您需要添加此Maven依赖性:
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-core -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.4.0</version>
</dependency>