使用Kafka Spout的Kafka Storm集成



我正在使用KafkaSpout。请在下面找到测试程序。

我使用的是Storm 0.8.1。风暴0.8.2中有多模式等级。我将使用它。我只是想知道早期版本是如何通过实例化StringScheme()类来工作的?在哪里可以下载Kafka Spout的早期版本?但我怀疑这是否是一个正确的替代方案,而不是在风暴0.8.2上工作???(困惑)

当我在风暴集群上运行代码(如下所示)时(即,当我推送拓扑时),我会得到以下错误(当Scheme部分被注释时会发生这种情况,否则我当然会得到编译器错误,因为类不在0.8.1中):

java.lang.NoClassDefFoundError: backtype/storm/spout/MultiScheme
        at storm.kafka.TestTopology.main(TestTopology.java:37)
Caused by: java.lang.ClassNotFoundException: backtype.storm.spout.MultiScheme

在下面给出的代码中,您可能会发现spootConfig.scheme=new StringScheme();部分评论道。如果我不评论那一行,我就会得到编译器错误,这很自然,因为那里没有构造函数。此外,当我实例化MultiScheme时,我会出错,因为我在0.8.1中没有那个类。

public class TestTopology {
    public static class PrinterBolt extends BaseBasicBolt {
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
        }
        public void execute(Tuple tuple, BasicOutputCollector collector) {
            System.out.println(tuple.toString());
        }
    }
    public static void main(String [] args) throws Exception {
        List<HostPort> hosts = new ArrayList<HostPort>();
        hosts.add(new HostPort("127.0.0.1",9092));
        LocalCluster cluster = new LocalCluster();
        TopologyBuilder builder = new TopologyBuilder();
        SpoutConfig spoutConfig = new SpoutConfig(new KafkaConfig.StaticHosts(hosts, 1), "test", "/zkRootStorm", "STORM-ID");
        spoutConfig.zkServers=ImmutableList.of("localhost");
        spoutConfig.zkPort=2181;
        //spoutConfig.scheme=new StringScheme();
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
        builder.setSpout("spout",new KafkaSpout(spoutConfig));
        builder.setBolt("printer", new PrinterBolt())
                .shuffleGrouping("spout");
        Config config = new Config();
        cluster.submitTopology("kafka-test", config, builder.createTopology());
        Thread.sleep(600000);
    }

我也遇到了同样的问题。最后解决了这个问题,我把完整的运行示例放到了github上。

欢迎您在此处查看>https://github.com/buildlackey/cep

(点击storm+kafka目录中的一个示例程序,它应该会让你启动并运行)。

我们也遇到了类似的问题。

我们的解决方案:

  1. 打开pom.xml

  2. 将范围从提供更改为<scope>compile</scope>

如果你想了解更多关于依赖范围的信息,请查看maven文档:Maven docu-依赖范围

最新更新