使用Trident拓扑从kafka读取数据时清空数据



我是三叉戟的新手。我正在写一个从kafka读取数据的三叉戟拓扑。主题名为"test"。我有本地kafka设置。我开始做动物园管理员,当地的卡夫卡。然后在kafka中创建一个主题test,然后打开producer并输入消息Hello kafka。

我想使用trident从'test'主题读取消息'Hello Kafka'。

下面是我的代码。我得到一个空元组。

    TridentTopology topology = new TridentTopology();
    BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
    TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
    kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
    kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
    kafkaConfig.forceFromStart = false;
    OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
    topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
      .each(new Fields(), new TestFilter()).parallelismHint(1)
      .each(new Fields(), new Utils.PrintFilter());

这是我的TestFilter类代码

public TestFilter()
{
    //
}
@Override
public boolean isKeep(TridentTuple tuple) {
    boolean isKeep=true;
    System.out.println("TestFilter is called...");
    if (tuple != null && tuple.getValues().size()>0) {
        System.out.println("data from kafka ::: "+tuple.getValues());
    } 
    return isKeep;
}

每当我在kafka生产者中输入消息到'test'主题时,第一个sysout被打印出来,但它没有通过if循环。我只是得到消息'TestFilter被称为…"仅此而已。

我想获得我为"test"主题生成的实际数据。如何?

问题在于Stream.each的参数。该方法的javadoc的相关部分是:

each(Fields inputFields, Filter filter)

文档对此不是很清楚,但语义是您应该使用inputFields参数指定过滤器使用的所有字段。

Storm将在输入元组上应用投影,并将其转发给过滤器

假设您没有指定任何输入字段,那么投影将导致一个空元组,从而导致过滤器内的tuple.getValues().size()>0条件失败。

还值得一提的是每个的其他变体:

each(Fields inputFields, Function function, Fields functionFields)
each(Function function, Fields functionFields)

这些将对输入元组的投影应用提供的函数,将结果元组追加到原始输入元组,将新字段重命名为functionFields(即投影仅用于应用函数)。

特别是第二个版本相当于调用每个inputFields设置为null(或new Fields()),并将导致一个空元组传递给函数

最新更新