当我从flinkkafkaconsumer09更改为flinkkafkaconsumer 时出现新错误 眨眼代码:
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import java.util.Properties;
@SuppressWarnings("deprecation")
public class ReadFromKafka {
public static void main(String[] args) throws Exception {
// create execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test-consumer-group");
DataStream<String> stream = env
.addSource(new FlinkKafkaConsumer<String>("test4", new SimpleStringSchema(), properties));
stream.map(new MapFunction<String, String>() {
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception {
return "Stream Value: " + value;
}
}).print();
env.execute();
}
}
错误: log4j:WARN 找不到记录器 (org.apache.flink.api.java.ClosureCleaner( 的追加器。 log4j:警告 请正确初始化 log4j 系统。 log4j:警告 有关详细信息,请参阅 http://logging.apache.org/log4j/1.2/faq.html#noconfig。 线程"main"org.apache.flink.runtime.client.JobExecutionException中的异常:作业执行失败。 at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146( at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:626( at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117( at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507( at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1489( at ReadFromKafka.main(ReadFromKafka.java:33( 由以下原因引起:org.apache.kafka.common.errors.TimeoutException:获取主题元数据时超时已过期
绒球.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.dataartisans</groupId>
<artifactId>kafka-example</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>kafkaex</name>
<description>this is flink kafka example</description>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>1.9.1</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
</dependencies>
</project>
flink-connector-kafka_2.12
与FlinkKafkaConsumer09
不兼容。
flink-connector-kafka_2.12
是一个"通用"的kafka连接器,编译用于Scala 2.12。此通用连接器可用于 0.11.0 开始的任何版本的 Kafka。
FlinkKafkaConsumer09
适用于 Kafka 0.9.x。如果你的 Kafka 代理运行的是 Kafka 0.9.x,那么你将需要flink-connector-kafka-0.9_2.11
或flink-connector-kafka-0.9_2.12
,这取决于你想要哪个版本的 Scala。
另一方面,如果您的 Kafka 代理运行的是最新版本的 Kafka(0.11.0 或更高版本(,请坚持使用flink-connector-kafka_2.12
并使用FlinkKafkaConsumer
而不是FlinkKafkaConsumer09
。
有关详细信息,请参阅文档。