Storm Kafka Spout无法正常工作



概述:我是一名学生,想在Storm/Kafka/Flink/MS Azure SA/Spark上运行一些性能测试(WordCount)。我想使用Kafka Broker作为输入源。

我使用了StormStarter项目中的WordCount示例,并添加了Kafka作为喷口:

public class WordCountKafkaTopology {
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
public static void main(String[] args) {
String zkIp = "localhost";
String topicName = "perfTest";
List<String> nimbus_seeds = new ArrayList<String>();
nimbus_seeds.add("localhost");
String zookeeperHost = zkIp +":2181";
ZkHosts zkHosts = new ZkHosts(zookeeperHost);
SpoutConfig kafkaConfig = new SpoutConfig(zkHosts, topicName, "/" + topicName, topicName);
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("kafkaPerfTestSpout", kafkaSpout, 8);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("kafkaPerfTestSpout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config config = new Config();
config.setMaxTaskParallelism(5);
config.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 2);
config.put(Config.NIMBUS_SEEDS, nimbus_seeds);
config.put(Config.NIMBUS_THRIFT_PORT, 6627);
config.put(Config.STORM_ZOOKEEPER_PORT, 2181);
config.put(Config.STORM_ZOOKEEPER_SERVERS, Arrays.asList(zkIp));
try {
StormSubmitter.submitTopology("my-kafka-topology", config, builder.createTopology());
} catch (Exception e) {
throw new IllegalStateException("Couldn't initialize the topology", e);
}
}
}

通过运行拓扑,我得到了一些错误消息。喷口上写着:

java.lang.ExceptionInInitializerError at kafka.metrics.KafkaMetricsGroup$class.newTimer(KafkaMetricsGroup.scala:89)at kafk.consumer.FetchRequestAndResponseMetrics.newTimer(FetchRequestAndRespinseStats.scala:26。(FetchRequestAndResponseStats.scala:35)在kafk.consumer。FetchRequestAndResponseStats。(FetchRequest AndResponse Stats.scala:47)在ka夫k.consumer.。FetchRequest and ResponseSTATs注册$$anonfun$2.在kafka.consumer.应用(FetchrequestAndResponsesStats.scalaScala:60)。FetchrequestandResponsesSTATs注册$anonfun$2.在kafa.utils.Pool.getAndMaybePut(Pool.scala:59)在kafka.consumer.FetchRequestAndResponseStatsRegister$.getFetchRequestAndRespinseStats.scala:64)在kafk.consumer.SimpleConsumer.(SimpleConsumer.scala:44)在kafka.javaapi.consumer.Simple consumer。(PartitionManager.java:74)在org.apache.storm.kafka.ZkCoordinator.refresh(ZkCoo协调器.java:98)在org.apache.storm.kafka.zkCoordiinator.getMyManagedPartitions(ZkCoordinator.java:69)在org.apache.storm.cafka.KafkaSpout.nextTriple(KafkaSpout.java:129)在org/apache.storn.daemon.executor$fn__7990$fn__8005$fn__8036.invoke(executor.clj:648)在org.pach.storm.util$async_loop$fn__624.invoke(util.clj:484)在java.lang.Thread.run(Thread.java:745)处的clojure.lang.AFn.run(AFn.java:22)由以下原因引起:java.lang.IllegalStateException:在com.yammer.metrics.metrics(metrics.java:21)处的java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)处正在关闭

在对开螺栓处:

java.lang.RuntimeException:java.lang.RuntimeException:java.lang.RuntimeException:pid:3973,名称:split exitCode:0,errorString:在org.apache.storm.utils.DisruptorQueue.coconsumerBatchToCursor(disruptor Queue.ja:464org.apache.storm.util$async_loop$fn__624.vinvoke(util.clj:484)在java.lang.lang.AFn.run(AFn.java:22)在java.lang.Thread.run(Thread.java:745)由以下原因引起:java.lang.RuntimeException:java.lang.LuntimeException;pid:3973,名称:split exitCode:0,errorString:在org.apache.storm.task.ShellBolt.execute(ShellBolt.java:150org.apache.storm.utils.DisruptorQueue.coconsumerBatchToCursor(Disruptor Queue.java:451)…6更多原因:java.lang.RuntimeException:pid:3973,名称:split exitCode:0,errorString:在org.apache.storm.task.ShellBolt.die(ShellBolt.java:295)在org.apache.ttorm.task.ShellBolt.access$400(Shell博尔t.java:70)在org.apache.storm.task.ShellBolt$BoltWriterRunable.run(ShellBolt.java:398)…1更多原因:java.io.IOException:在java.io.FileOutputStream.writeBytes(Native Method)在java.io.FileOutputStream.write(FileOutputStream.java:326)处管道破裂java.io.BufferedOutputStream.flushBufferorg.apache.storm.multilang.JsonSerializer.writeString。。。再增加1个

我使用kafka控制台生成器生成一些消息。我希望有人能帮助我。我是编程风暴的新手。。。

删除"config.put(config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,2);"完成了任务!

最新更新