卡夫卡 ->风暴 -> 闪烁:意外的块数据



我正在将拓扑从storm移动到flink。拓扑简化为KafkaSpout->Bolt。螺栓只是计数数据包,而不是试图解码它们。

编译后的.jar通过flink -c <entry point> <path to .jar>提交给flink,并出现以下错误:

java.lang.Exception: Call to registerInputOutput() of invokable failed
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
        at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:190)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:174)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
        ... 1 more
Caused by: java.io.StreamCorruptedException: unexpected block data
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1365)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
        at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
        at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
        at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:175)
        ... 3 more

我的问题(s):

  1. 我错过了一个配置步骤w/s>KafkaSpout?这在香草风暴中是有效的。
  2. 是否有我需要使用的风暴库的特定版本?我包括0.9.4与我的构建。
  3. 还有什么我可能错过的吗?

我应该使用storm KafkaSpout还是使用flink KafkaSource编写自己的更好的服务?


编辑:

以下是相关的代码: 拓扑:

BrokerHosts brokerHosts = new ZkHosts(configuration.getString("kafka/zookeeper"));
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, configuration.getString("kafka/topic"), "/storm_env_values", "storm_env_DEBUG");
FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
builder.setSpout("environment", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("decode_bytes", new EnvironmentBolt(), 1).shuffleGrouping("environment");

初始化:

FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster();
cluster.submitTopology("env_topology", conf, buildTopology());

该螺栓基于BaseRichBolt。execute() fn只记录任何要调试的数据包的存在。没有其他代码

我刚看了这个。现在有一个问题,但我让它在本地工作。您可以将此修复应用到您的代码中,并自己构建兼容层。

  1. KafkaSpout注册指标。但是,兼容性层目前不支持度量。您需要删除FlinkTopologyContext.registerMetric(...)中的异常,只返回null。(已经有一个开放的PR在整合参数,因此我不想把这个热修复推到主分支)
  2. 此外,您需要手动为查询添加一些配置参数:

我只是在这里设置了一些值:

Config c = new Config();
List<String> zkServers = new ArrayList<String>();
zkServers.add("localhost");
c.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
c.put(Config.STORM_ZOOKEEPER_PORT, 2181);
c.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
c.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
  • 你需要添加一些额外的依赖到你的项目:
  • 除了flink-storm,你还需要:

    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-kafka</artifactId>
        <version>0.9.4</version>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_2.10</artifactId>
        <version>0.8.1.1</version>
    </dependency>
    

    这对我来说是有效的,使用Kafka_2.10-0.8.1.1FlinkLocalCluster在Eclipse中执行。

    它也可以在通过bin/start-local-streaming.sh启动的本地Flink集群中工作。为此,使用bin/flink run命令时,您需要使用FlinkSubmitter而不是FlinkLocalCluster。此外,您的jar还需要下列依赖项:

    <include>org.apache.storm:storm-kafka</include>
    <include>org.apache.kafka:kafka_2.10</include>
    <include>org.apache.curator:curator-client</include>
    <include>org.apache.curator:curator-framework</include>
    <include>com.google.guava:guava</include>
    <include>com.yammer.metrics:metrics-core</include>
    

    相关内容

    • 没有找到相关文章

    最新更新