我正在尝试运行一个简单的带有Kafka ingration的Apache Flink脚本,但我在执行时一直遇到问题。 脚本应该读取来自 kafka 生产者的消息,详细说明它们,然后再次发送回另一个主题,处理的结果。 我从这里得到了这个例子: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-td4828.html
我遇到的错误是:
Exception in thread "main" java.lang.NoSuchFieldError:ALL
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:86)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:429)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:46)
at org.apache.flink.streaming.api.environment.LocalStreamEnviro nment.execute(LocalS treamEnvironment.jav a:33(
这是我的代码:
public class App {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
//properties.setProperty("zookeeper.connect", "localhost:2181");
properties.setProperty("group.id", "javaflink");
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), properties));
System.out.println("Step D");
messageStream.map(new MapFunction<String, String>(){
public String map(String value) throws Exception {
// TODO Auto-generated method stub
return "Blablabla " + value;
}
}).addSink(new FlinkKafkaProducer010("localhost:9092", "demo2", new SimpleStringSchema()));
env.execute();
}
}
这些是pom.xml
依赖项:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java_2.11</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.1</version>
</dependency>
什么可能导致这种错误?
谢谢 卢卡
该问题很可能是由您在pom.xml
中定义的不同 Flink 版本的混合引起的。为了运行此程序,包含以下依赖项就足够了:
<!-- Streaming API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<!-- In order to execute the program from within your IDE -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<!-- Kafka connector dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.1</version>
</dependency>