kafka-apache flink execution log4j error



我正在尝试运行一个简单的带有Kafka ingration的Apache Flink脚本,但我在执行时一直遇到问题。 脚本应该读取来自 kafka 生产者的消息,详细说明它们,然后再次发送回另一个主题,处理的结果。 我从这里得到了这个例子: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Simple-Flink-Kafka-Test-td4828.html

我遇到的错误是:

Exception in thread "main" java.lang.NoSuchFieldError:ALL 
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenera‌tor.createJobGraph(S‌​treamingJobGraphGene‌​rator.java:86) 
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph‌​(StreamGraph.java:42‌​9) 
at org.apache.flink.streaming.api.environment.LocalStreamEnviro‌nment.execute(LocalS‌​treamEnvironment.jav‌​a:46) 

at org.apache.flink.streaming.api.environment.LocalStreamEnviro nment.execute(LocalS treamEnvironment.jav a:33(

这是我的代码:

public class App {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
Properties properties = new Properties(); 
properties.setProperty("bootstrap.servers", "localhost:9092"); 
//properties.setProperty("zookeeper.connect", "localhost:2181"); 
properties.setProperty("group.id", "javaflink"); 
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), properties));
System.out.println("Step D"); 
messageStream.map(new MapFunction<String, String>(){ 
public String map(String value) throws Exception { 
// TODO Auto-generated method stub 
return "Blablabla " +  value; 
} 
}).addSink(new FlinkKafkaProducer010("localhost:9092", "demo2", new SimpleStringSchema())); 
env.execute(); 
}
}

这些是pom.xml依赖项:

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java_2.11</artifactId>
<version>0.10.2</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.1</version>
</dependency>

什么可能导致这种错误?

谢谢 卢卡

该问题很可能是由您在pom.xml中定义的不同 Flink 版本的混合引起的。为了运行此程序,包含以下依赖项就足够了:

<!-- Streaming API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<!-- In order to execute the program from within your IDE -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.3.1</version>
</dependency>
<!-- Kafka connector dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.10_2.11</artifactId>
<version>1.3.1</version>
</dependency>

相关内容

  • 没有找到相关文章

最新更新