我是三叉戟的新手。我正在写一个从kafka读取数据的三叉戟拓扑。主题名为"test"。我有本地kafka设置。我开始做动物园管理员,当地的卡夫卡。然后在kafka中创建一个主题test,然后打开producer并输入消息Hello kafka。
我想使用trident从'test'主题读取消息'Hello Kafka'。
下面是我的代码。我得到一个空元组。
TridentTopology topology = new TridentTopology();
BrokerHosts brokerHosts = new ZkHosts("localhost:2181");
TridentKafkaConfig kafkaConfig = new TridentKafkaConfig(brokerHosts, "test");
kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
kafkaConfig.bufferSizeBytes = 1024 * 1024 * 4;
kafkaConfig.fetchSizeBytes = 1024 * 1024 * 4;
kafkaConfig.forceFromStart = false;
OpaqueTridentKafkaSpout opaqueTridentKafkaSpout = new OpaqueTridentKafkaSpout(kafkaConfig);
topology.newStream("TestSpout", opaqueTridentKafkaSpout).parallelismHint(1)
.each(new Fields(), new TestFilter()).parallelismHint(1)
.each(new Fields(), new Utils.PrintFilter());
这是我的TestFilter类代码
public TestFilter()
{
//
}
@Override
public boolean isKeep(TridentTuple tuple) {
boolean isKeep=true;
System.out.println("TestFilter is called...");
if (tuple != null && tuple.getValues().size()>0) {
System.out.println("data from kafka ::: "+tuple.getValues());
}
return isKeep;
}
每当我在kafka生产者中输入消息到'test'主题时,第一个sysout被打印出来,但它没有通过if循环。我只是得到消息'TestFilter被称为…"仅此而已。
我想获得我为"test"主题生成的实际数据。如何?
问题在于Stream.each的参数。该方法的javadoc的相关部分是:
each(Fields inputFields, Filter filter)
文档对此不是很清楚,但语义是您应该使用inputFields参数指定过滤器使用的所有字段。
Storm将在输入元组上应用投影,并将其转发给过滤器。
假设您没有指定任何输入字段,那么投影将导致一个空元组,从而导致过滤器内的tuple.getValues().size()>0
条件失败。
还值得一提的是每个的其他变体:
each(Fields inputFields, Function function, Fields functionFields)
each(Function function, Fields functionFields)
这些将对输入元组的投影应用提供的函数,将结果元组追加到原始输入元组,将新字段重命名为functionFields(即投影仅用于应用函数)。
特别是第二个版本相当于调用每个inputFields设置为null(或new Fields()
),并将导致一个空元组传递给函数。