我正在将拓扑从storm移动到flink。拓扑简化为KafkaSpout->Bolt
。螺栓只是计数数据包,而不是试图解码它们。
编译后的.jar通过flink -c <entry point> <path to .jar>
提交给flink,并出现以下错误:
java.lang.Exception: Call to registerInputOutput() of invokable failed
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:529)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:190)
at org.apache.flink.streaming.runtime.tasks.StreamTask.registerInputOutput(StreamTask.java:174)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:526)
... 1 more
Caused by: java.io.StreamCorruptedException: unexpected block data
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1365)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:294)
at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:255)
at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:175)
... 3 more
我的问题(s):
- 我错过了一个配置步骤w/s>KafkaSpout?这在香草风暴中是有效的。
- 是否有我需要使用的风暴库的特定版本?我包括0.9.4与我的构建。 还有什么我可能错过的吗?
我应该使用storm KafkaSpout还是使用flink KafkaSource编写自己的更好的服务?
编辑:以下是相关的代码: 拓扑:
BrokerHosts brokerHosts = new ZkHosts(configuration.getString("kafka/zookeeper"));
SpoutConfig kafkaConfig = new SpoutConfig(brokerHosts, configuration.getString("kafka/topic"), "/storm_env_values", "storm_env_DEBUG");
FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
builder.setSpout("environment", new KafkaSpout(kafkaConfig), 1);
builder.setBolt("decode_bytes", new EnvironmentBolt(), 1).shuffleGrouping("environment");
初始化:
FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster();
cluster.submitTopology("env_topology", conf, buildTopology());
该螺栓基于BaseRichBolt。execute() fn只记录任何要调试的数据包的存在。没有其他代码
我刚看了这个。现在有一个问题,但我让它在本地工作。您可以将此修复应用到您的代码中,并自己构建兼容层。
-
KafkaSpout
注册指标。但是,兼容性层目前不支持度量。您需要删除FlinkTopologyContext.registerMetric(...)
中的异常,只返回null
。(已经有一个开放的PR在整合参数,因此我不想把这个热修复推到主分支) - 此外,您需要手动为查询添加一些配置参数:
我只是在这里设置了一些值:
Config c = new Config();
List<String> zkServers = new ArrayList<String>();
zkServers.add("localhost");
c.put(Config.STORM_ZOOKEEPER_SERVERS, zkServers);
c.put(Config.STORM_ZOOKEEPER_PORT, 2181);
c.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 30);
c.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
c.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5);
- 你需要添加一些额外的依赖到你的项目:
除了flink-storm
,你还需要:
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-kafka</artifactId>
<version>0.9.4</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
</dependency>
这对我来说是有效的,使用Kafka_2.10-0.8.1.1和FlinkLocalCluster
在Eclipse中执行。
它也可以在通过bin/start-local-streaming.sh
启动的本地Flink集群中工作。为此,使用bin/flink run
命令时,您需要使用FlinkSubmitter
而不是FlinkLocalCluster
。此外,您的jar还需要下列依赖项:
<include>org.apache.storm:storm-kafka</include>
<include>org.apache.kafka:kafka_2.10</include>
<include>org.apache.curator:curator-client</include>
<include>org.apache.curator:curator-framework</include>
<include>com.google.guava:guava</include>
<include>com.yammer.metrics:metrics-core</include>