Kafka Spout的字段分组



字段分组可以在kafka喷口发出的元组上完成吗?如果是,那么Storm是如何了解卡夫卡记录中的字段的?

Storm中的字段分组(以及一般分组)用于螺栓,而不是喷嘴。这是通过InputDeclarer类完成的
当您在TopologyBuilder上调用setBolt()时,会返回InputDeclarer

Kafka-Spout像声明其他组件一样声明其输出字段。我的解释是基于KafkaSpout的当前实现。

在KafkaSpout.java类中,我们看到declareOutputFields方法,它调用KafkaConfig Scheme的getOutputFields()方法。

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(_spoutConfig.scheme.getOutputFields());
}

默认情况下,KafkaConfig使用RawMultiScheme来实现这种方法。

  @Override
  public Fields getOutputFields() {
    return new Fields("bytes");
  }

那么这意味着什么呢?,如果您使用fieldGrouping声明了从KafkaSpout读取元组的bolt,您就会知道每个包含等于字段"字节"的元组都将由同一任务执行如果您想发出任何字段,您应该根据需要实现新的方案。

TL:DR KafkaSpout的默认实现在declareOutputFields:中声明了以下输出字段

new Fields("topic", "partition", "offset", "key", "value");

所以在构建拓扑代码时直接做:

topologyBuilder.setSpout(spoutName, mySpout, parallelismHintSpout);
topologyBuilder.setBolt(boltName, myBolt, parallelismHintBolt).fieldsGrouping(spoutName, new Fields("key"));

详细信息:稍微研究一下代码告诉:

在Kafka Spout中,declareOutputFields以以下方式实现:

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
    for (String stream : translator.streams()) {
        declarer.declareStream(stream, translator.getFieldsFor(stream));
    }
}

它从RecordTranslator接口获取字段,其实例从kafkaSpoutConfigKafkaSpoutConfig<K, V>获取。KafkaSpoutConfig<K, V>CommonKafkaSpoutConfig扩展而来(不过在1.1.1版本中略有不同)。该生成器返回DefaultRecordTranslator。如果你检查这个类实现中的字段,你会发现:

public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");

因此,我们可以在拓扑代码中的字段分组中直接使用Fields("key")

topologyBuilder.setBolt(boltName, myBolt, parallelismHintBolt).fieldsGrouping(spoutName, new Fields("key"));

最新更新