flink+Kafka: getHostnamePort



我想从flink阅读一个kafka主题

package Toletum.pruebas;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
public class LeeKafka {
  public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ParameterTool.fromArgs(args);
    
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      FlinkKafkaConsumer082<String> kafkaSrc = new FlinkKafkaConsumer082<String>("test02", 
      new SimpleStringSchema(), 
      parameterTool.getProperties());
      
      DataStream<String> messageStream = env.addSource(kafkaSrc);
      
    messageStream.rebalance().map(new MapFunction<String, String>() {
      private static final long serialVersionUID = -6867736771747690202L;
  
      public String map(String value) throws Exception {
        return "Kafka and Flink says: " + value;
      }
    }).print();
    env.execute("LeeKafka");
  }
}

此代码成功运行:

java -cp Package.jar Toletum.pruebas.LeeKafka --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

但是,当我尝试从 flink 使用时:

flink run -c Toletum.pruebas.LeeKafka pruebas-0.0.1-SNAPSHOT-jar-with-dependencies.jar --topic test02 --bootstrap.servers kafka:9092 --zookeeper.connect zookeeper:2181 --group.id myGroup

我收到一个错误:

java.lang.NoSuchMethodError: org.apache.flink.util.NetUtils.getHostnamePort(Ljava/lang/String;(Ljava/net/URL;        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.getPartitionsForTopic(FlinkKafkaConsumer.java:592(        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer.(FlinkKafkaConsumer.java:280(        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer082.(FlinkKafka消费者082.java:49(        at Toletum.pruebas.LeeKafka.main(LeeKafka.java:22(        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method(        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57(        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43(        at java.lang.reflect.Method.invoke(Method.java:606(        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:497(        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:395(        at org.apache.flink.client.program.Client.runBlocking(Client.java:252(        at org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:676(        at org.apache.flink.client.CliFrontend.run(CliFrontend.java:326(        at org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:978(        at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1028(

旧版本库.....

正确的绒球.xml:


            <dependency>
                    <groupId>org.apache.flink</groupId>
                    <artifactId>flink-connector-kafka</artifactId>
                    <version>0.10.1</version>
            </dependency>

此问题是由于使用了旧版本的 FLink 连接器库。

您可以查看最新的可用库并下载最新的 Maven 依赖项。

您正在使用的 Kafka 版本也应该被考虑在内。

尝试使用 Flink 文档中最新的 Maven 依赖项 Kafka 连接器

最新的 maven 依赖项是

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka-0.8_2.10</artifactId>
  <version>1.3.2</version>
</dependency>

相关内容

  • 没有找到相关文章

最新更新