字段分组可以在kafka喷口发出的元组上完成吗?如果是,那么Storm是如何了解卡夫卡记录中的字段的?
Storm中的字段分组(以及一般分组)用于螺栓,而不是喷嘴。这是通过InputDeclarer
类完成的
当您在TopologyBuilder
上调用setBolt()
时,会返回InputDeclarer
。
Kafka-Spout像声明其他组件一样声明其输出字段。我的解释是基于KafkaSpout的当前实现。
在KafkaSpout.java类中,我们看到declareOutputFields方法,它调用KafkaConfig Scheme的getOutputFields()方法。
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(_spoutConfig.scheme.getOutputFields());
}
默认情况下,KafkaConfig使用RawMultiScheme来实现这种方法。
@Override
public Fields getOutputFields() {
return new Fields("bytes");
}
那么这意味着什么呢?,如果您使用fieldGrouping声明了从KafkaSpout读取元组的bolt,您就会知道每个包含等于字段"字节"的元组都将由同一任务执行如果您想发出任何字段,您应该根据需要实现新的方案。
TL:DR
KafkaSpout的默认实现在declareOutputFields
:中声明了以下输出字段
new Fields("topic", "partition", "offset", "key", "value");
所以在构建拓扑代码时直接做:
topologyBuilder.setSpout(spoutName, mySpout, parallelismHintSpout);
topologyBuilder.setBolt(boltName, myBolt, parallelismHintBolt).fieldsGrouping(spoutName, new Fields("key"));
详细信息:稍微研究一下代码告诉:
在Kafka Spout中,declareOutputFields
以以下方式实现:
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
for (String stream : translator.streams()) {
declarer.declareStream(stream, translator.getFieldsFor(stream));
}
}
它从RecordTranslator
接口获取字段,其实例从kafkaSpoutConfig
即KafkaSpoutConfig<K, V>
获取。KafkaSpoutConfig<K, V>
从CommonKafkaSpoutConfig
扩展而来(不过在1.1.1版本中略有不同)。该生成器返回DefaultRecordTranslator
。如果你检查这个类实现中的字段,你会发现:
public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
因此,我们可以在拓扑代码中的字段分组中直接使用Fields("key")
topologyBuilder.setBolt(boltName, myBolt, parallelismHintBolt).fieldsGrouping(spoutName, new Fields("key"));